From 6692b0c8954ad30751f57a5504b58532f8417793 Mon Sep 17 00:00:00 2001 From: Ilya Averyanov Date: Mon, 28 Nov 2022 18:20:15 +0300 Subject: [PATCH] feat(bridge): add Redis bridge --- .ci/docker-compose-file/Makefile.local | 14 +- .../docker-compose-redis-cluster-tcp.yaml | 4 +- .../docker-compose-redis-cluster-tls.yaml | 4 +- .ci/docker-compose-file/redis/.gitignore | 6 +- .ci/docker-compose-file/redis/redis-tls.conf | 7 +- .ci/docker-compose-file/redis/redis.conf | 1 + .ci/docker-compose-file/redis/redis.sh | 91 ++-- .ci/docker-compose-file/toxiproxy.json | 7 + apps/emqx/test/emqx_common_test_helpers.erl | 5 +- .../test/emqx_authz_api_sources_SUITE.erl | 1 - apps/emqx_bridge/include/emqx_bridge.hrl | 16 + apps/emqx_bridge/src/emqx_bridge.erl | 19 +- .../src/emqx_connector_redis.erl | 77 ++- .../test/emqx_connector_redis_SUITE.erl | 27 +- lib-ee/emqx_ee_bridge/docker-ct | 2 + .../i18n/emqx_ee_bridge_redis.conf | 73 +++ lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl | 32 +- .../src/emqx_ee_bridge_redis.erl | 193 +++++++ .../test/emqx_ee_bridge_redis_SUITE.erl | 493 ++++++++++++++++++ .../src/emqx_ee_connector_redis.erl | 138 +++++ scripts/ct/run.sh | 4 + 21 files changed, 1117 insertions(+), 97 deletions(-) create mode 100644 lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_redis.conf create mode 100644 lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl create mode 100644 lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl create mode 100644 lib-ee/emqx_ee_connector/src/emqx_ee_connector_redis.erl diff --git a/.ci/docker-compose-file/Makefile.local b/.ci/docker-compose-file/Makefile.local index 026cc7a1d..d11ab64a6 100644 --- a/.ci/docker-compose-file/Makefile.local +++ b/.ci/docker-compose-file/Makefile.local @@ -16,7 +16,7 @@ up: REDIS_TAG=6 \ MONGO_TAG=5 \ PGSQL_TAG=13 \ - docker-compose \ + docker compose \ -f .ci/docker-compose-file/docker-compose.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-single-tls.yaml \ @@ -28,10 +28,13 @@ up: -f .ci/docker-compose-file/docker-compose-redis-single-tls.yaml \ -f .ci/docker-compose-file/docker-compose-redis-sentinel-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-redis-sentinel-tls.yaml \ - up -d --build + -f .ci/docker-compose-file/docker-compose-redis-cluster-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-redis-cluster-tls.yaml \ + -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \ + up -d --build --remove-orphans down: - docker-compose \ + docker compose \ -f .ci/docker-compose-file/docker-compose.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-mongo-single-tls.yaml \ @@ -43,7 +46,10 @@ down: -f .ci/docker-compose-file/docker-compose-redis-single-tls.yaml \ -f .ci/docker-compose-file/docker-compose-redis-sentinel-tcp.yaml \ -f .ci/docker-compose-file/docker-compose-redis-sentinel-tls.yaml \ - down + -f .ci/docker-compose-file/docker-compose-redis-cluster-tcp.yaml \ + -f .ci/docker-compose-file/docker-compose-redis-cluster-tls.yaml \ + -f .ci/docker-compose-file/docker-compose-toxiproxy.yaml \ + down --remove-orphans ct: docker exec -i "$(CONTAINER)" bash -c "rebar3 ct --name 'test@127.0.0.1' -v --suite $(SUITE)" diff --git a/.ci/docker-compose-file/docker-compose-redis-cluster-tcp.yaml b/.ci/docker-compose-file/docker-compose-redis-cluster-tcp.yaml index 997388aa5..9c03fc65e 100644 --- a/.ci/docker-compose-file/docker-compose-redis-cluster-tcp.yaml +++ b/.ci/docker-compose-file/docker-compose-redis-cluster-tcp.yaml @@ -1,9 +1,9 @@ version: '3.9' services: - redis_server: + redis_cluster: image: redis:${REDIS_TAG} - container_name: redis + container_name: redis-cluster volumes: - ./redis/:/data/conf command: bash -c "/bin/bash /data/conf/redis.sh --node cluster && tail -f /var/log/redis-server.log" diff --git a/.ci/docker-compose-file/docker-compose-redis-cluster-tls.yaml b/.ci/docker-compose-file/docker-compose-redis-cluster-tls.yaml index c5cefd9e6..bfbf1a4a3 100644 --- a/.ci/docker-compose-file/docker-compose-redis-cluster-tls.yaml +++ b/.ci/docker-compose-file/docker-compose-redis-cluster-tls.yaml @@ -1,8 +1,8 @@ version: '3.9' services: - redis_server: - container_name: redis + redis_cluster_tls: + container_name: redis-cluster-tls image: redis:${REDIS_TAG} volumes: - ../../apps/emqx/etc/certs/cacert.pem:/etc/certs/ca.crt diff --git a/.ci/docker-compose-file/redis/.gitignore b/.ci/docker-compose-file/redis/.gitignore index b5947692d..23ffe8469 100644 --- a/.ci/docker-compose-file/redis/.gitignore +++ b/.ci/docker-compose-file/redis/.gitignore @@ -1,3 +1,3 @@ -r7000i.log -r7001i.log -r7002i.log +r700?i.log +nodes.700?.conf +*.rdb diff --git a/.ci/docker-compose-file/redis/redis-tls.conf b/.ci/docker-compose-file/redis/redis-tls.conf index e304c814f..c503dc2e8 100644 --- a/.ci/docker-compose-file/redis/redis-tls.conf +++ b/.ci/docker-compose-file/redis/redis-tls.conf @@ -1,11 +1,12 @@ daemonize yes bind 0.0.0.0 :: logfile /var/log/redis-server.log +protected-mode no +requirepass public +masterauth public + tls-cert-file /etc/certs/redis.crt tls-key-file /etc/certs/redis.key tls-ca-cert-file /etc/certs/ca.crt tls-replication yes tls-cluster yes -protected-mode no -requirepass public -masterauth public diff --git a/.ci/docker-compose-file/redis/redis.conf b/.ci/docker-compose-file/redis/redis.conf index 6181925db..484d9abf9 100644 --- a/.ci/docker-compose-file/redis/redis.conf +++ b/.ci/docker-compose-file/redis/redis.conf @@ -1,5 +1,6 @@ daemonize yes bind 0.0.0.0 :: logfile /var/log/redis-server.log +protected-mode no requirepass public masterauth public diff --git a/.ci/docker-compose-file/redis/redis.sh b/.ci/docker-compose-file/redis/redis.sh index b7cf62a60..be6462249 100755 --- a/.ci/docker-compose-file/redis/redis.sh +++ b/.ci/docker-compose-file/redis/redis.sh @@ -16,13 +16,8 @@ case $key in shift # past argument shift # past value ;; - -t) - tls="$2" - shift # past argument - shift # past value - ;; --tls-enabled) - tls=1 + tls=true shift # past argument ;; *) @@ -37,69 +32,71 @@ rm -f \ /data/conf/r7002i.log \ /data/conf/nodes.7000.conf \ /data/conf/nodes.7001.conf \ - /data/conf/nodes.7002.conf ; + /data/conf/nodes.7002.conf -if [ "${node}" = "cluster" ] ; then - if $tls ; then +if [ "$node" = "cluster" ]; then + if $tls; then redis-server /data/conf/redis-tls.conf --port 7000 --cluster-config-file /data/conf/nodes.7000.conf \ - --tls-port 8000 --cluster-enabled yes ; + --tls-port 8000 --cluster-enabled yes redis-server /data/conf/redis-tls.conf --port 7001 --cluster-config-file /data/conf/nodes.7001.conf \ - --tls-port 8001 --cluster-enabled yes; + --tls-port 8001 --cluster-enabled yes redis-server /data/conf/redis-tls.conf --port 7002 --cluster-config-file /data/conf/nodes.7002.conf \ - --tls-port 8002 --cluster-enabled yes; + --tls-port 8002 --cluster-enabled yes else - redis-server /data/conf/redis.conf --port 7000 --cluster-config-file /data/conf/nodes.7000.conf --cluster-enabled yes; - redis-server /data/conf/redis.conf --port 7001 --cluster-config-file /data/conf/nodes.7001.conf --cluster-enabled yes; - redis-server /data/conf/redis.conf --port 7002 --cluster-config-file /data/conf/nodes.7002.conf --cluster-enabled yes; + redis-server /data/conf/redis.conf --port 7000 --cluster-config-file /data/conf/nodes.7000.conf \ + --cluster-enabled yes + redis-server /data/conf/redis.conf --port 7001 --cluster-config-file /data/conf/nodes.7001.conf \ + --cluster-enabled yes + redis-server /data/conf/redis.conf --port 7002 --cluster-config-file /data/conf/nodes.7002.conf \ + --cluster-enabled yes fi -elif [ "${node}" = "sentinel" ] ; then - if $tls ; then +elif [ "$node" = "sentinel" ]; then + if $tls; then redis-server /data/conf/redis-tls.conf --port 7000 --cluster-config-file /data/conf/nodes.7000.conf \ - --tls-port 8000 --cluster-enabled no; + --tls-port 8000 --cluster-enabled no redis-server /data/conf/redis-tls.conf --port 7001 --cluster-config-file /data/conf/nodes.7001.conf \ - --tls-port 8001 --cluster-enabled no --slaveof "$LOCAL_IP" 8000; + --tls-port 8001 --cluster-enabled no --slaveof "$LOCAL_IP" 8000 redis-server /data/conf/redis-tls.conf --port 7002 --cluster-config-file /data/conf/nodes.7002.conf \ - --tls-port 8002 --cluster-enabled no --slaveof "$LOCAL_IP" 8000; + --tls-port 8002 --cluster-enabled no --slaveof "$LOCAL_IP" 8000 else redis-server /data/conf/redis.conf --port 7000 --cluster-config-file /data/conf/nodes.7000.conf \ - --cluster-enabled no; + --cluster-enabled no redis-server /data/conf/redis.conf --port 7001 --cluster-config-file /data/conf/nodes.7001.conf \ - --cluster-enabled no --slaveof "$LOCAL_IP" 7000; + --cluster-enabled no --slaveof "$LOCAL_IP" 7000 redis-server /data/conf/redis.conf --port 7002 --cluster-config-file /data/conf/nodes.7002.conf \ - --cluster-enabled no --slaveof "$LOCAL_IP" 7000; + --cluster-enabled no --slaveof "$LOCAL_IP" 7000 fi fi -REDIS_LOAD_FLG=true; + +REDIS_LOAD_FLG=true while $REDIS_LOAD_FLG; do - sleep 1; - redis-cli --pass public --no-auth-warning -p 7000 info 1> /data/conf/r7000i.log 2> /dev/null; - if [ -s /data/conf/r7000i.log ]; then - : - else - continue; + sleep 1 + redis-cli --pass public --no-auth-warning -p 7000 info 1> /data/conf/r7000i.log 2> /dev/null + if ! [ -s /data/conf/r7000i.log ]; then + continue fi - redis-cli --pass public --no-auth-warning -p 7001 info 1> /data/conf/r7001i.log 2> /dev/null; - if [ -s /data/conf/r7001i.log ]; then - : - else - continue; + redis-cli --pass public --no-auth-warning -p 7001 info 1> /data/conf/r7001i.log 2> /dev/null + if ! [ -s /data/conf/r7001i.log ]; then + continue fi redis-cli --pass public --no-auth-warning -p 7002 info 1> /data/conf/r7002i.log 2> /dev/null; - if [ -s /data/conf/r7002i.log ]; then - : - else - continue; + if ! [ -s /data/conf/r7002i.log ]; then + continue fi - if [ "${node}" = "cluster" ] ; then - if $tls ; then - yes "yes" | redis-cli --cluster create "$LOCAL_IP:8000" "$LOCAL_IP:8001" "$LOCAL_IP:8002" --pass public --no-auth-warning --tls true --cacert /etc/certs/ca.crt --cert /etc/certs/redis.crt --key /etc/certs/redis.key; + if [ "$node" = "cluster" ] ; then + if $tls; then + yes "yes" | redis-cli --cluster create "$LOCAL_IP:8000" "$LOCAL_IP:8001" "$LOCAL_IP:8002" \ + --pass public --no-auth-warning \ + --tls true --cacert /etc/certs/ca.crt \ + --cert /etc/certs/redis.crt --key /etc/certs/redis.key else - yes "yes" | redis-cli --cluster create "$LOCAL_IP:7000" "$LOCAL_IP:7001" "$LOCAL_IP:7002" --pass public --no-auth-warning; + yes "yes" | redis-cli --cluster create "$LOCAL_IP:7000" "$LOCAL_IP:7001" "$LOCAL_IP:7002" \ + --pass public --no-auth-warning fi - elif [ "${node}" = "sentinel" ] ; then + elif [ "$node" = "sentinel" ]; then tee /_sentinel.conf>/dev/null << EOF port 26379 bind 0.0.0.0 :: @@ -107,7 +104,7 @@ daemonize yes logfile /var/log/redis-server.log dir /tmp EOF - if $tls ; then + if $tls; then cat >>/_sentinel.conf< fun({Host, Port}) -> is_tcp_server_available(Host, Port) end, - lists:all(Fun, Servers). + case lists:partition(Fun, Servers) of + {_, []} -> true; + {_, Unavail} -> ct:print("Unavailable servers: ~p", [Unavail]) + end. -spec is_tcp_server_available( Host :: inet:socket_address() | inet:hostname(), diff --git a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl index e26ad9839..a8ae38444 100644 --- a/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl +++ b/apps/emqx_authz/test/emqx_authz_api_sources_SUITE.erl @@ -80,7 +80,6 @@ <<"servers">> => <>, <<"redis_type">> => <<"cluster">>, <<"pool_size">> => 1, - <<"database">> => 0, <<"password">> => <<"ee">>, <<"auto_reconnect">> => true, <<"ssl">> => #{<<"enable">> => false}, diff --git a/apps/emqx_bridge/include/emqx_bridge.hrl b/apps/emqx_bridge/include/emqx_bridge.hrl index 6bc80f9cc..ab0e895fa 100644 --- a/apps/emqx_bridge/include/emqx_bridge.hrl +++ b/apps/emqx_bridge/include/emqx_bridge.hrl @@ -1,3 +1,19 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + -define(EMPTY_METRICS, ?METRICS( 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0 diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index 44028e900..42d216c7c 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -52,7 +52,10 @@ T == webhook; T == mysql; T == influxdb_api_v1; - T == influxdb_api_v2 + T == influxdb_api_v2; + T == redis_single; + T == redis_sentinel; + T == redis_cluster %% T == influxdb_udp ). @@ -135,6 +138,7 @@ on_message_publish(Message = #message{topic = Topic, flags = Flags}) -> {ok, Message}. send_to_matched_egress_bridges(Topic, Msg) -> + MatchedBridgeIds = get_matched_bridges(Topic), lists:foreach( fun(Id) -> try send_message(Id, Msg) of @@ -157,7 +161,7 @@ send_to_matched_egress_bridges(Topic, Msg) -> }) end end, - get_matched_bridges(Topic) + MatchedBridgeIds ). send_message(BridgeId, Message) -> @@ -242,6 +246,12 @@ disable_enable(Action, BridgeType, BridgeName) when ). create(BridgeType, BridgeName, RawConf) -> + ?SLOG(debug, #{ + brige_action => create, + bridge_type => BridgeType, + bridge_name => BridgeName, + bridge_raw_config => RawConf + }), emqx_conf:update( emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], RawConf, @@ -249,6 +259,11 @@ create(BridgeType, BridgeName, RawConf) -> ). remove(BridgeType, BridgeName) -> + ?SLOG(debug, #{ + brige_action => remove, + bridge_type => BridgeType, + bridge_name => BridgeName + }), emqx_conf:remove( emqx_bridge:config_key_path() ++ [BridgeType, BridgeName], #{override_to => cluster} diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index a1e864f1d..5a77ba6ab 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -37,7 +37,7 @@ -export([connect/1]). --export([cmd/3]). +-export([do_cmd/3]). %% redis host don't need parse -define(REDIS_HOST_OPTIONS, #{ @@ -63,7 +63,8 @@ fields(single) -> [ {server, fun server/1}, {redis_type, #{ - type => hoconsc:enum([single]), + type => single, + default => single, required => true, desc => ?DESC("single") }} @@ -74,18 +75,20 @@ fields(cluster) -> [ {servers, fun servers/1}, {redis_type, #{ - type => hoconsc:enum([cluster]), + type => cluster, + default => cluster, required => true, desc => ?DESC("cluster") }} ] ++ - redis_fields() ++ + lists:keydelete(database, 1, redis_fields()) ++ emqx_connector_schema_lib:ssl_fields(); fields(sentinel) -> [ {servers, fun servers/1}, {redis_type, #{ - type => hoconsc:enum([sentinel]), + type => sentinel, + default => sentinel, required => true, desc => ?DESC("sentinel") }}, @@ -119,7 +122,6 @@ on_start( InstId, #{ redis_type := Type, - database := Database, pool_size := PoolSize, auto_reconnect := AutoReconn, ssl := SSL @@ -135,13 +137,17 @@ on_start( single -> [{servers, [maps:get(server, Config)]}]; _ -> [{servers, maps:get(servers, Config)}] end, + Database = + case Type of + cluster -> []; + _ -> [{database, maps:get(database, Config)}] + end, Opts = [ {pool_size, PoolSize}, - {database, Database}, {password, maps:get(password, Config, "")}, {auto_reconnect, reconn_interval(AutoReconn)} - ] ++ Servers, + ] ++ Database ++ Servers, Options = case maps:get(enable, SSL) of true -> @@ -157,9 +163,12 @@ on_start( case Type of cluster -> case eredis_cluster:start_pool(PoolName, Opts ++ [{options, Options}]) of - {ok, _} -> {ok, State}; - {ok, _, _} -> {ok, State}; - {error, Reason} -> {error, Reason} + {ok, _} -> + {ok, State}; + {ok, _, _} -> + {ok, State}; + {error, Reason} -> + {error, Reason} end; _ -> case @@ -180,23 +189,28 @@ on_stop(InstId, #{poolname := PoolName, type := Type}) -> _ -> emqx_plugin_libs_pool:stop_pool(PoolName) end. -on_query(InstId, {cmd, Command}, #{poolname := PoolName, type := Type} = State) -> +on_query(InstId, {cmd, _} = Query, State) -> + do_query(InstId, Query, State); +on_query(InstId, {cmds, _} = Query, State) -> + do_query(InstId, Query, State). + +do_query(InstId, Query, #{poolname := PoolName, type := Type} = State) -> ?TRACE( "QUERY", "redis_connector_received", - #{connector => InstId, sql => Command, state => State} + #{connector => InstId, query => Query, state => State} ), Result = case Type of - cluster -> eredis_cluster:q(PoolName, Command); - _ -> ecpool:pick_and_do(PoolName, {?MODULE, cmd, [Type, Command]}, no_handover) + cluster -> do_cmd(PoolName, cluster, Query); + _ -> ecpool:pick_and_do(PoolName, {?MODULE, do_cmd, [Type, Query]}, no_handover) end, case Result of {error, Reason} -> ?SLOG(error, #{ - msg => "redis_connector_do_cmd_query_failed", + msg => "redis_connector_do_query_failed", connector => InstId, - sql => Command, + query => Query, reason => Reason }); _ -> @@ -226,7 +240,7 @@ on_get_status(_InstId, #{type := cluster, poolname := PoolName, auto_reconnect : Health = eredis_cluster_workers_exist_and_are_connected(Workers), status_result(Health, AutoReconn); false -> - disconnect + disconnected end; on_get_status(_InstId, #{poolname := Pool, auto_reconnect := AutoReconn}) -> Health = emqx_plugin_libs_pool:health_check_ecpool_workers(Pool, fun ?MODULE:do_get_status/1), @@ -245,10 +259,29 @@ status_result(_Status = false, _AutoReconn = false) -> disconnected. reconn_interval(true) -> 15; reconn_interval(false) -> false. -cmd(Conn, cluster, Command) -> - eredis_cluster:q(Conn, Command); -cmd(Conn, _Type, Command) -> - eredis:q(Conn, Command). +do_cmd(PoolName, cluster, {cmd, Command}) -> + eredis_cluster:q(PoolName, Command); +do_cmd(Conn, _Type, {cmd, Command}) -> + eredis:q(Conn, Command); +do_cmd(PoolName, cluster, {cmds, Commands}) -> + wrap_qp_result(eredis_cluster:qp(PoolName, Commands)); +do_cmd(Conn, _Type, {cmds, Commands}) -> + wrap_qp_result(eredis:qp(Conn, Commands)). + +wrap_qp_result({error, _} = Error) -> + Error; +wrap_qp_result(Results) when is_list(Results) -> + AreAllOK = lists:all( + fun + ({ok, _}) -> true; + ({error, _}) -> false + end, + Results + ), + case AreAllOK of + true -> {ok, Results}; + false -> {error, Results} + end. %% =================================================================== connect(Opts) -> diff --git a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl index d9199d2d6..e67dced2f 100644 --- a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl +++ b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl @@ -111,6 +111,14 @@ perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) -> ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), % Perform query as further check that the resource is working as expected ?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})), + ?assertEqual( + {ok, [{ok, <<"PONG">>}, {ok, <<"PONG">>}]}, + emqx_resource:query(PoolName, {cmds, [RedisCommand, RedisCommand]}) + ), + ?assertMatch( + {error, [{ok, <<"PONG">>}, {error, _}]}, + emqx_resource:query(PoolName, {cmds, [RedisCommand, [<<"INVALID_COMMAND">>]]}) + ), ?assertEqual(ok, emqx_resource:stop(PoolName)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. @@ -152,14 +160,14 @@ redis_config_cluster() -> redis_config_sentinel() -> redis_config_base("sentinel", "servers"). --define(REDIS_CONFIG_BASE(MaybeSentinel), +-define(REDIS_CONFIG_BASE(MaybeSentinel, MaybeDatabase), "" ++ "\n" ++ " auto_reconnect = true\n" ++ - " database = 1\n" ++ " pool_size = 8\n" ++ " redis_type = ~s\n" ++ MaybeSentinel ++ + MaybeDatabase ++ " password = public\n" ++ " ~s = \"~s:~b\"\n" ++ " " ++ @@ -171,15 +179,22 @@ redis_config_base(Type, ServerKey) -> "sentinel" -> Host = ?REDIS_SENTINEL_HOST, Port = ?REDIS_SENTINEL_PORT, - MaybeSentinel = " sentinel = mymaster\n"; - _ -> + MaybeSentinel = " sentinel = mymaster\n", + MaybeDatabase = " database = 1\n"; + "single" -> Host = ?REDIS_SINGLE_HOST, Port = ?REDIS_SINGLE_PORT, - MaybeSentinel = "" + MaybeSentinel = "", + MaybeDatabase = " database = 1\n"; + "cluster" -> + Host = ?REDIS_SINGLE_HOST, + Port = ?REDIS_SINGLE_PORT, + MaybeSentinel = "", + MaybeDatabase = "" end, RawConfig = list_to_binary( io_lib:format( - ?REDIS_CONFIG_BASE(MaybeSentinel), + ?REDIS_CONFIG_BASE(MaybeSentinel, MaybeDatabase), [Type, ServerKey, Host, Port] ) ), diff --git a/lib-ee/emqx_ee_bridge/docker-ct b/lib-ee/emqx_ee_bridge/docker-ct index 94f9379df..fba33559e 100644 --- a/lib-ee/emqx_ee_bridge/docker-ct +++ b/lib-ee/emqx_ee_bridge/docker-ct @@ -4,3 +4,5 @@ kafka mongo mongo_rs_sharded mysql +redis +redis_cluster diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_redis.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_redis.conf new file mode 100644 index 000000000..a5744df4c --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_redis.conf @@ -0,0 +1,73 @@ +emqx_ee_bridge_redis { + local_topic { + desc { + en: """The MQTT topic filter to be forwarded to Redis. All MQTT 'PUBLISH' messages with the topic +matching the local_topic will be forwarded.
+NOTE: if this bridge is used as the action of a rule (EMQX rule engine), and also local_topic is +configured, then both the data got from the rule and the MQTT messages that match local_topic +will be forwarded. +""" + zh: """发送到 'local_topic' 的消息都会转发到 Redis。
+注意:如果这个 Bridge 被用作规则(EMQX 规则引擎)的输出,同时也配置了 'local_topic' ,那么这两部分的消息都会被转发到 Redis。 +""" + } + label { + en: "Local Topic" + zh: "本地 Topic" + } + } + + command_template { + desc { + en: """Redis Command Template""" + zh: """Redis Command 模板""" + } + label { + en: "Redis Command Template" + zh: "Redis Command 模板" + } + } + config_enable { + desc { + en: """Enable or disable this bridge""" + zh: """启用/禁用桥接""" + } + label { + en: "Enable Or Disable Bridge" + zh: "启用/禁用桥接" + } + } + + desc_config { + desc { + en: """Configuration for a Redis bridge.""" + zh: """Resis 桥接配置""" + } + label: { + en: "Redis Bridge Configuration" + zh: "Redis 桥接配置" + } + } + + desc_type { + desc { + en: """The Bridge Type""" + zh: """Bridge 类型""" + } + label { + en: "Bridge Type" + zh: "桥接类型" + } + } + + desc_name { + desc { + en: """Bridge name, used as a human-readable description of the bridge.""" + zh: """桥接名字,可读描述""" + } + label { + en: "Bridge Name" + zh: "桥接名字" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index e0d362f5e..0f88e8315 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -22,7 +22,10 @@ api_schemas(Method) -> ref(emqx_ee_bridge_hstreamdb, Method), %% ref(emqx_ee_bridge_influxdb, Method ++ "_udp"), ref(emqx_ee_bridge_influxdb, Method ++ "_api_v1"), - ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2") + ref(emqx_ee_bridge_influxdb, Method ++ "_api_v2"), + ref(emqx_ee_bridge_redis, Method ++ "_single"), + ref(emqx_ee_bridge_redis, Method ++ "_sentinel"), + ref(emqx_ee_bridge_redis, Method ++ "_cluster") ]. schema_modules() -> @@ -31,7 +34,8 @@ schema_modules() -> emqx_ee_bridge_hstreamdb, emqx_ee_bridge_influxdb, emqx_ee_bridge_mongodb, - emqx_ee_bridge_mysql + emqx_ee_bridge_mysql, + emqx_ee_bridge_redis ]. examples(Method) -> @@ -55,7 +59,10 @@ resource_type(mongodb_single) -> emqx_connector_mongo; resource_type(mysql) -> emqx_connector_mysql; resource_type(influxdb_udp) -> emqx_ee_connector_influxdb; resource_type(influxdb_api_v1) -> emqx_ee_connector_influxdb; -resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb. +resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb; +resource_type(redis_single) -> emqx_ee_connector_redis; +resource_type(redis_sentinel) -> emqx_ee_connector_redis; +resource_type(redis_cluster) -> emqx_ee_connector_redis. fields(bridges) -> [ @@ -83,7 +90,7 @@ fields(bridges) -> required => false } )} - ] ++ mongodb_structs() ++ influxdb_structs(). + ] ++ mongodb_structs() ++ influxdb_structs() ++ redis_structs(). mongodb_structs() -> [ @@ -114,3 +121,20 @@ influxdb_structs() -> influxdb_api_v2 ] ]. + +redis_structs() -> + [ + {Type, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_redis, Type)), + #{ + desc => <<"Redis Bridge Config">>, + required => false + } + )} + || Type <- [ + redis_single, + redis_sentinel, + redis_cluster + ] + ]. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl new file mode 100644 index 000000000..5360efa7f --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_redis.erl @@ -0,0 +1,193 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_redis). + +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_examples(Method) -> + [ + #{ + <<"redis_single">> => #{ + summary => <<"Redis Single Node Bridge">>, + value => values("single", Method) + } + }, + #{ + <<"redis_sentinel">> => #{ + summary => <<"Redis Sentinel Bridge">>, + value => values("sentinel", Method) + } + }, + #{ + <<"redis_cluster">> => #{ + summary => <<"Redis Cluster Bridge">>, + value => values("cluster", Method) + } + } + ]. + +values(Protocol, get) -> + maps:merge(values(Protocol, post), ?METRICS_EXAMPLE); +values("single", post) -> + SpecificOpts = #{ + server => <<"127.0.0.1:6379">>, + database => 1 + }, + values(common, "single", SpecificOpts); +values("sentinel", post) -> + SpecificOpts = #{ + servers => [<<"127.0.0.1:26379">>], + sentinel => <<"mymaster">>, + database => 1 + }, + values(common, "sentinel", SpecificOpts); +values("cluster", post) -> + SpecificOpts = #{ + servers => [<<"127.0.0.1:6379">>] + }, + values(common, "cluster", SpecificOpts); +values(Protocol, put) -> + maps:without([type, name], values(Protocol, post)). + +values(common, RedisType, SpecificOpts) -> + Config = #{ + type => list_to_atom("redis_" ++ RedisType), + name => <<"redis_bridge">>, + enable => true, + local_topic => <<"local/topic/#">>, + pool_size => 8, + password => <<"secret">>, + auto_reconnect => true, + command_template => [<<"LPUSH">>, <<"MSGS">>, <<"${payload}">>], + resource_opts => #{ + enable_batch => false, + batch_size => 100, + batch_time => <<"20ms">> + }, + ssl => #{enable => false} + }, + maps:merge(Config, SpecificOpts). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions +namespace() -> "bridge_redis". + +roots() -> []. + +fields("post_single") -> + method_fileds(post, redis_single); +fields("post_sentinel") -> + method_fileds(post, redis_sentinel); +fields("post_cluster") -> + method_fileds(post, redis_cluster); +fields("put_single") -> + method_fileds(put, redis_single); +fields("put_sentinel") -> + method_fileds(put, redis_sentinel); +fields("put_cluster") -> + method_fileds(put, redis_cluster); +fields("get_single") -> + method_fileds(get, redis_single); +fields("get_sentinel") -> + method_fileds(get, redis_sentinel); +fields("get_cluster") -> + method_fileds(get, redis_cluster); +fields(Type) when + Type == redis_single orelse Type == redis_sentinel orelse Type == redis_cluster +-> + redis_bridge_common_fields() ++ + connector_fields(Type). + +method_fileds(post, ConnectorType) -> + redis_bridge_common_fields() ++ + connector_fields(ConnectorType) ++ + type_name_fields(ConnectorType); +method_fileds(get, ConnectorType) -> + redis_bridge_common_fields() ++ + connector_fields(ConnectorType) ++ + type_name_fields(ConnectorType) ++ + emqx_bridge_schema:metrics_status_fields(); +method_fileds(put, ConnectorType) -> + redis_bridge_common_fields() ++ + connector_fields(ConnectorType). + +redis_bridge_common_fields() -> + emqx_bridge_schema:common_bridge_fields() ++ + [ + {local_topic, mk(binary(), #{desc => ?DESC("local_topic")})}, + {command_template, fun command_template/1} + ] ++ + emqx_resource_schema:fields("resource_opts"). + +connector_fields(Type) -> + RedisType = bridge_type_to_redis_conn_type(Type), + emqx_connector_redis:fields(RedisType). + +bridge_type_to_redis_conn_type(redis_single) -> + single; +bridge_type_to_redis_conn_type(redis_sentinel) -> + sentinel; +bridge_type_to_redis_conn_type(redis_cluster) -> + cluster. + +type_name_fields(Type) -> + [ + {type, mk(Type, #{required => true, desc => ?DESC("desc_type")})}, + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})} + ]. + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Redis using `", string:to_upper(Method), "` method."]; +desc(redis_single) -> + ?DESC(emqx_connector_redis, "single"); +desc(redis_sentinel) -> + ?DESC(emqx_connector_redis, "sentinel"); +desc(redis_cluster) -> + ?DESC(emqx_connector_redis, "cluster"); +desc(_) -> + undefined. + +command_template(type) -> + list(binary()); +command_template(required) -> + true; +command_template(validator) -> + fun is_command_template_valid/1; +command_template(desc) -> + ?DESC("command_template"); +command_template(_) -> + undefined. + +is_command_template_valid(CommandSegments) -> + case + is_list(CommandSegments) andalso length(CommandSegments) > 0 andalso + lists:all(fun is_binary/1, CommandSegments) + of + true -> + ok; + false -> + {error, + "the value of the field 'command_template' should be a nonempty " + "list of strings (templates for Redis command and arguments)"} + end. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl new file mode 100644 index 000000000..cd6a2d212 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_redis_SUITE.erl @@ -0,0 +1,493 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_redis_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +-define(REDIS_TOXYPROXY_CONNECT_CONFIG, #{ + <<"server">> => <<"toxiproxy:6379">> +}). + +-define(COMMON_REDIS_OPTS, #{ + <<"password">> => <<"public">>, + <<"command_template">> => [<<"RPUSH">>, <<"MSGS">>, <<"${payload}">>], + <<"local_topic">> => <<"local_topic/#">> +}). + +-define(BATCH_SIZE, 5). + +-define(PROXY_HOST, "toxiproxy"). +-define(PROXY_PORT, "8474"). + +all() -> [{group, redis_types}, {group, rest}]. + +groups() -> + ResourceSpecificTCs = [t_create_delete_bridge], + TCs = emqx_common_test_helpers:all(?MODULE) -- ResourceSpecificTCs, + TypeGroups = [ + {group, redis_single}, + {group, redis_sentinel}, + {group, redis_cluster} + ], + BatchGroups = [ + {group, batch_on}, + {group, batch_off} + ], + [ + {rest, TCs}, + {redis_types, [ + {group, tcp}, + {group, tls} + ]}, + {tcp, TypeGroups}, + {tls, TypeGroups}, + {redis_single, BatchGroups}, + {redis_sentinel, BatchGroups}, + {redis_cluster, BatchGroups}, + {batch_on, ResourceSpecificTCs}, + {batch_off, ResourceSpecificTCs} + ]. + +init_per_group(Group, Config) when + Group =:= redis_single; Group =:= redis_sentinel; Group =:= redis_cluster +-> + [{redis_type, Group} | Config]; +init_per_group(Group, Config) when + Group =:= tcp; Group =:= tls +-> + [{transport, Group} | Config]; +init_per_group(Group, Config) when + Group =:= batch_on; Group =:= batch_off +-> + [{batch_mode, Group} | Config]; +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_suite(Config) -> + TestHosts = all_test_hosts(), + case emqx_common_test_helpers:is_all_tcp_servers_available(TestHosts) of + true -> + ProxyHost = os:getenv("PROXY_HOST", ?PROXY_HOST), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", ?PROXY_PORT)), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps([ + emqx_resource, emqx_bridge, emqx_rule_engine + ]), + {ok, _} = application:ensure_all_started(emqx_connector), + [ + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort} + | Config + ]; + false -> + {skip, no_redis} + end. + +end_per_suite(_Config) -> + ok = delete_all_bridges(), + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps([emqx_rule_engine, emqx_bridge, emqx_resource]), + _ = application:stop(emqx_connector), + ok. + +init_per_testcase(_Testcase, Config) -> + ok = delete_all_bridges(), + case ?config(redis_type, Config) of + undefined -> + Config; + RedisType -> + Transport = ?config(transport, Config), + BatchMode = ?config(batch_mode, Config), + #{RedisType := #{Transport := RedisConnConfig}} = redis_connect_configs(), + #{BatchMode := ResourceConfig} = resource_configs(), + IsBatch = (BatchMode =:= batch_on), + BridgeConfig0 = maps:merge(RedisConnConfig, ?COMMON_REDIS_OPTS), + BridgeConfig1 = BridgeConfig0#{<<"resource_opts">> => ResourceConfig}, + [{bridge_config, BridgeConfig1}, {is_batch, IsBatch} | Config] + end. + +end_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + ok = snabbkaffe:stop(), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok = delete_all_bridges(). + +t_create_delete_bridge(Config) -> + Name = <<"mybridge">>, + Type = ?config(redis_type, Config), + BridgeConfig = ?config(bridge_config, Config), + IsBatch = ?config(is_batch, Config), + ?assertMatch( + {ok, _}, + emqx_bridge:create(Type, Name, BridgeConfig) + ), + + ResourceId = emqx_bridge_resource:resource_id(Type, Name), + + ?assertEqual( + {ok, connected}, + emqx_resource:health_check(ResourceId) + ), + + RedisType = atom_to_binary(Type), + Action = <>, + + RuleId = <<"my_rule_id">>, + RuleConf = #{ + actions => [Action], + description => <<>>, + enable => true, + id => RuleId, + name => <<>>, + sql => <<"SELECT * FROM \"t/#\"">> + }, + + %% check export by rule + {ok, _} = emqx_rule_engine:create_rule(RuleConf), + _ = check_resource_queries(ResourceId, <<"t/test">>, IsBatch), + ok = emqx_rule_engine:delete_rule(RuleId), + + %% check export through local topic + _ = check_resource_queries(ResourceId, <<"local_topic/test">>, IsBatch), + + {ok, _} = emqx_bridge:remove(Type, Name). + +% check that we provide correct examples +t_check_values(_Config) -> + lists:foreach( + fun(Method) -> + lists:foreach( + fun({RedisType, #{value := Value0}}) -> + Value = maps:without(maps:keys(?METRICS_EXAMPLE), Value0), + MethodBin = atom_to_binary(Method), + Type = string:slice(RedisType, length("redis_")), + RefName = binary_to_list(<>), + Schema = conf_schema(RefName), + ?assertMatch( + #{}, + hocon_tconf:check_plain(Schema, #{<<"root">> => Value}, #{ + atom_key => true, + required => false + }) + ) + end, + lists:flatmap( + fun maps:to_list/1, + emqx_ee_bridge_redis:conn_bridge_examples(Method) + ) + ) + end, + [put, post, get] + ). + +t_check_replay(Config) -> + Name = <<"toxic_bridge">>, + Type = <<"redis_single">>, + Topic = <<"local_topic/test">>, + ProxyName = "redis_single_tcp", + + ?assertMatch( + {ok, _}, + emqx_bridge:create(Type, Name, toxiproxy_redis_bridge_config()) + ), + + ResourceId = emqx_bridge_resource:resource_id(Type, Name), + Health = emqx_resource:health_check(ResourceId), + + ?assertEqual( + {ok, connected}, + Health + ), + + ?check_trace( + begin + ?wait_async_action( + with_down_failure(Config, ProxyName, fun() -> + ct:sleep(100), + lists:foreach( + fun(_) -> + _ = publish_message(Topic, <<"test_payload">>) + end, + lists:seq(1, ?BATCH_SIZE) + ) + end), + #{?snk_kind := redis_ee_connector_send_done, batch := true, result := {ok, _}}, + 10000 + ) + end, + fun(Trace) -> + ?assert( + ?strict_causality( + #{?snk_kind := redis_ee_connector_send_done, result := {error, _}}, + #{?snk_kind := redis_ee_connector_send_done, result := {ok, _}}, + Trace + ) + ) + end + ), + {ok, _} = emqx_bridge:remove(Type, Name). + +t_permanent_error(_Config) -> + Name = <<"invalid_command_bridge">>, + Type = <<"redis_single">>, + Topic = <<"local_topic/test">>, + Payload = <<"payload for invalid redis command">>, + + ?assertMatch( + {ok, _}, + emqx_bridge:create(Type, Name, invalid_command_bridge_config()) + ), + + ?check_trace( + begin + ?wait_async_action( + publish_message(Topic, Payload), + #{?snk_kind := redis_ee_connector_send_done}, + 10000 + ) + end, + fun(Trace) -> + ?assertMatch( + [#{result := {error, _}} | _], + ?of_kind(redis_ee_connector_send_done, Trace) + ) + end + ), + {ok, _} = emqx_bridge:remove(Type, Name). + +t_create_disconnected(Config) -> + Name = <<"toxic_bridge">>, + Type = <<"redis_single">>, + + ?check_trace( + with_down_failure(Config, "redis_single_tcp", fun() -> + {ok, _} = emqx_bridge:create( + Type, Name, toxiproxy_redis_bridge_config() + ) + end), + fun(Trace) -> + ?assertMatch( + [#{error := _} | _], + ?of_kind(redis_ee_connector_start_error, Trace) + ), + ok + end + ), + {ok, _} = emqx_bridge:remove(Type, Name). + +%%------------------------------------------------------------------------------ +%% Helper functions +%%------------------------------------------------------------------------------ + +with_down_failure(Config, Name, F) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + emqx_common_test_helpers:with_failure(down, Name, ProxyHost, ProxyPort, F). + +check_resource_queries(ResourceId, Topic, IsBatch) -> + RandomPayload = rand:bytes(20), + N = + case IsBatch of + true -> ?BATCH_SIZE; + false -> 1 + end, + ?check_trace( + begin + ?wait_async_action( + lists:foreach( + fun(_) -> + _ = publish_message(Topic, RandomPayload) + end, + lists:seq(1, N) + ), + #{?snk_kind := redis_ee_connector_send_done, batch := IsBatch}, + 1000 + ) + end, + fun(Trace) -> + AddedMsgCount = length(added_msgs(ResourceId, RandomPayload)), + case IsBatch of + true -> + ?assertMatch( + [#{result := {ok, _}, batch := true, batch_size := ?BATCH_SIZE} | _], + ?of_kind(redis_ee_connector_send_done, Trace) + ), + ?assertEqual(?BATCH_SIZE, AddedMsgCount); + false -> + ?assertMatch( + [#{result := {ok, _}, batch := false} | _], + ?of_kind(redis_ee_connector_send_done, Trace) + ), + ?assertEqual(1, AddedMsgCount) + end + end + ). + +added_msgs(ResourceId, Payload) -> + {ok, Results} = emqx_resource:simple_sync_query( + ResourceId, {cmd, [<<"LRANGE">>, <<"MSGS">>, <<"0">>, <<"-1">>]} + ), + [El || El <- Results, El =:= Payload]. + +conf_schema(StructName) -> + #{ + fields => #{}, + translations => #{}, + validations => [], + namespace => undefined, + roots => [{root, hoconsc:ref(emqx_ee_bridge_redis, StructName)}] + }. + +delete_all_bridges() -> + lists:foreach( + fun(#{name := Name, type := Type}) -> + emqx_bridge:remove(Type, Name) + end, + emqx_bridge:list() + ). + +all_test_hosts() -> + Confs = [ + ?REDIS_TOXYPROXY_CONNECT_CONFIG + | lists:concat([ + maps:values(TypeConfs) + || TypeConfs <- maps:values(redis_connect_configs()) + ]) + ], + lists:flatmap( + fun + (#{<<"servers">> := ServersRaw}) -> + lists:map( + fun(Server) -> + parse_server(Server) + end, + string:tokens(binary_to_list(ServersRaw), ", ") + ); + (#{<<"server">> := ServerRaw}) -> + [parse_server(ServerRaw)] + end, + Confs + ). + +parse_server(Server) -> + emqx_connector_schema_lib:parse_server(Server, #{ + host_type => hostname, + default_port => 6379 + }). + +redis_connect_ssl_opts(Type) -> + maps:merge( + client_ssl_cert_opts(Type), + #{ + <<"enable">> => <<"true">>, + <<"verify">> => <<"verify_none">> + } + ). + +client_ssl_cert_opts(redis_single) -> + emqx_authn_test_lib:client_ssl_cert_opts(); +client_ssl_cert_opts(_) -> + Dir = code:lib_dir(emqx, etc), + #{ + <<"keyfile">> => filename:join([Dir, <<"certs">>, <<"client-key.pem">>]), + <<"certfile">> => filename:join([Dir, <<"certs">>, <<"client-cert.pem">>]), + <<"cacertfile">> => filename:join([Dir, <<"certs">>, <<"cacert.pem">>]) + }. + +redis_connect_configs() -> + #{ + redis_single => #{ + tcp => #{ + <<"server">> => <<"redis:6379">> + }, + tls => #{ + <<"server">> => <<"redis-tls:6380">>, + <<"ssl">> => redis_connect_ssl_opts(redis_single) + } + }, + redis_sentinel => #{ + tcp => #{ + <<"servers">> => <<"redis-sentinel:26379">>, + <<"sentinel">> => <<"mymaster">> + }, + tls => #{ + <<"servers">> => <<"redis-sentinel-tls:26380">>, + <<"sentinel">> => <<"mymaster">>, + <<"ssl">> => redis_connect_ssl_opts(redis_sentinel) + } + }, + redis_cluster => #{ + tcp => #{ + <<"servers">> => <<"redis-cluster:7000,redis-cluster:7001,redis-cluster:7002">> + }, + tls => #{ + <<"servers">> => + <<"redis-cluster-tls:8000,redis-cluster-tls:8001,redis-cluster-tls:8002">>, + <<"ssl">> => redis_connect_ssl_opts(redis_cluster) + } + } + }. + +toxiproxy_redis_bridge_config() -> + Conf0 = ?REDIS_TOXYPROXY_CONNECT_CONFIG#{ + <<"resource_opts">> => #{ + <<"query_mode">> => <<"async">>, + <<"enable_batch">> => <<"true">>, + <<"enable_queue">> => <<"true">>, + <<"worker_pool_size">> => <<"1">>, + <<"batch_size">> => integer_to_binary(?BATCH_SIZE), + <<"health_check_interval">> => <<"1s">> + } + }, + maps:merge(Conf0, ?COMMON_REDIS_OPTS). + +invalid_command_bridge_config() -> + #{redis_single := #{tcp := Conf0}} = redis_connect_configs(), + Conf1 = maps:merge(Conf0, ?COMMON_REDIS_OPTS), + Conf1#{ + <<"resource_opts">> => #{ + <<"enable_batch">> => <<"false">>, + <<"enable_queue">> => <<"false">>, + <<"worker_pool_size">> => <<"1">> + }, + <<"command_template">> => [<<"BAD">>, <<"COMMAND">>, <<"${payload}">>] + }. + +resource_configs() -> + #{ + batch_off => #{ + <<"query_mode">> => <<"sync">>, + <<"enable_batch">> => <<"false">>, + <<"enable_queue">> => <<"false">> + }, + batch_on => #{ + <<"query_mode">> => <<"async">>, + <<"enable_batch">> => <<"true">>, + <<"enable_queue">> => <<"true">>, + <<"worker_pool_size">> => <<"1">>, + <<"batch_size">> => integer_to_binary(?BATCH_SIZE) + } + }. + +publish_message(Topic, Payload) -> + {ok, Client} = emqtt:start_link(), + {ok, _} = emqtt:connect(Client), + ok = emqtt:publish(Client, Topic, Payload), + ok = emqtt:stop(Client). diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_redis.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_redis.erl new file mode 100644 index 000000000..39579c737 --- /dev/null +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_redis.erl @@ -0,0 +1,138 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_connector_redis). + +-include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +-behaviour(emqx_resource). + +%% callbacks of behaviour emqx_resource +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_batch_query/3, + on_get_status/2 +]). + +%% ------------------------------------------------------------------------------------------------- +%% resource callbacks +%% ------------------------------------------------------------------------------------------------- + +callback_mode() -> always_sync. + +on_start(InstId, #{command_template := CommandTemplate} = Config) -> + case emqx_connector_redis:on_start(InstId, Config) of + {ok, RedisConnSt} -> + ?tp( + redis_ee_connector_start_success, + #{} + ), + {ok, #{ + conn_st => RedisConnSt, + command_template => preproc_command_template(CommandTemplate) + }}; + {error, _} = Error -> + ?tp( + redis_ee_connector_start_error, + #{error => Error} + ), + Error + end. + +on_stop(InstId, #{conn_st := RedisConnSt}) -> + emqx_connector_redis:on_stop(InstId, RedisConnSt). + +on_get_status(InstId, #{conn_st := RedisConnSt}) -> + emqx_connector_redis:on_get_status(InstId, RedisConnSt). + +on_query( + InstId, + {send_message, Data}, + _State = #{ + command_template := CommandTemplate, conn_st := RedisConnSt + } +) -> + Cmd = proc_command_template(CommandTemplate, Data), + ?tp( + redis_ee_connector_cmd, + #{cmd => Cmd, batch => false, mode => sync} + ), + Result = query(InstId, {cmd, Cmd}, RedisConnSt), + ?tp( + redis_ee_connector_send_done, + #{cmd => Cmd, batch => false, mode => sync, result => Result} + ), + Result; +on_query( + InstId, + Query, + _State = #{conn_st := RedisConnSt} +) -> + ?tp( + redis_ee_connector_query, + #{query => Query, batch => false, mode => sync} + ), + Result = query(InstId, Query, RedisConnSt), + ?tp( + redis_ee_connector_send_done, + #{query => Query, batch => false, mode => sync, result => Result} + ), + Result. + +on_batch_query( + InstId, BatchData, _State = #{command_template := CommandTemplate, conn_st := RedisConnSt} +) -> + Cmds = process_batch_data(BatchData, CommandTemplate), + ?tp( + redis_ee_connector_send, + #{batch_data => BatchData, batch => true, mode => sync} + ), + Result = query(InstId, {cmds, Cmds}, RedisConnSt), + ?tp( + redis_ee_connector_send_done, + #{ + batch_data => BatchData, + batch_size => length(BatchData), + batch => true, + mode => sync, + result => Result + } + ), + Result. + +%% ------------------------------------------------------------------------------------------------- +%% private helpers +%% ------------------------------------------------------------------------------------------------- + +query(InstId, Query, RedisConnSt) -> + case emqx_connector_redis:on_query(InstId, Query, RedisConnSt) of + {ok, _} = Ok -> Ok; + {error, no_connection} -> {error, {recoverable_error, no_connection}}; + {error, _} = Error -> Error + end. + +process_batch_data(BatchData, CommandTemplate) -> + lists:map( + fun({send_message, Data}) -> + proc_command_template(CommandTemplate, Data) + end, + BatchData + ). + +proc_command_template(CommandTemplate, Msg) -> + lists:map( + fun(ArgTks) -> + emqx_plugin_libs_rule:proc_tmpl(ArgTks, Msg, #{return => full_binary}) + end, + CommandTemplate + ). + +preproc_command_template(CommandTemplate) -> + lists:map( + fun emqx_plugin_libs_rule:preproc_tmpl/1, + CommandTemplate + ). diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 18dfb2525..ae779c572 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -113,6 +113,10 @@ for dep in ${CT_DEPS}; do '.ci/docker-compose-file/docker-compose-redis-sentinel-tcp.yaml' '.ci/docker-compose-file/docker-compose-redis-sentinel-tls.yaml' ) ;; + redis_cluster) + FILES+=( '.ci/docker-compose-file/docker-compose-redis-cluster-tcp.yaml' + '.ci/docker-compose-file/docker-compose-redis-cluster-tls.yaml' ) + ;; mysql) FILES+=( '.ci/docker-compose-file/docker-compose-mysql-tcp.yaml' '.ci/docker-compose-file/docker-compose-mysql-tls.yaml' )