diff --git a/apps/emqx_connector/rebar.config b/apps/emqx_connector/rebar.config index a5d453287..5f66d10d5 100644 --- a/apps/emqx_connector/rebar.config +++ b/apps/emqx_connector/rebar.config @@ -13,7 +13,7 @@ %% NOTE: mind poolboy version when updating mongodb-erlang version {mongodb, {git,"https://github.com/emqx/mongodb-erlang", {tag, "v3.0.11"}}}, %% NOTE: mind poolboy version when updating eredis_cluster version - {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.6.7"}}}, + {eredis_cluster, {git, "https://github.com/emqx/eredis_cluster", {tag, "0.7.0"}}}, %% mongodb-erlang uses a special fork https://github.com/comtihon/poolboy.git %% (which has overflow_ttl feature added). %% However, it references `{branch, "master}` (commit 9c06a9a on 2021-04-07). diff --git a/apps/emqx_connector/src/emqx_connector_redis.erl b/apps/emqx_connector/src/emqx_connector_redis.erl index f25ef4331..91928acc6 100644 --- a/apps/emqx_connector/src/emqx_connector_redis.erl +++ b/apps/emqx_connector/src/emqx_connector_redis.erl @@ -129,10 +129,13 @@ on_start(InstId, #{redis_type := Type, end end. -on_stop(InstId, #{poolname := PoolName}) -> +on_stop(InstId, #{poolname := PoolName, type := Type}) -> ?SLOG(info, #{msg => "stopping_redis_connector", connector => InstId}), - emqx_plugin_libs_pool:stop_pool(PoolName). + case Type of + cluster -> eredis_cluster:stop_pool(PoolName); + _ -> emqx_plugin_libs_pool:stop_pool(PoolName) + end. on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := Type} = State) -> ?TRACE("QUERY", "redis_connector_received", @@ -151,16 +154,30 @@ on_query(InstId, {cmd, Command}, AfterCommand, #{poolname := PoolName, type := T end, Result. +extract_eredis_cluster_workers(PoolName) -> + lists:flatten([gen_server:call(PoolPid, get_all_workers) || + PoolPid <- eredis_cluster_monitor:get_all_pools(PoolName)]). + +eredis_cluster_workers_exist_and_are_connected(Workers) -> + length(Workers) > 0 andalso lists:all( + fun({_, Pid, _, _}) -> + eredis_cluster_pool_worker:is_connected(Pid) =:= true + end, Workers). + on_health_check(_InstId, #{type := cluster, poolname := PoolName} = State) -> - Workers = lists:flatten([gen_server:call(PoolPid, get_all_workers) || - PoolPid <- eredis_cluster_monitor:get_all_pools(PoolName)]), - case length(Workers) > 0 andalso lists:all( - fun({_, Pid, _, _}) -> - eredis_cluster_pool_worker:is_connected(Pid) =:= true - end, Workers) of - true -> {ok, State}; - false -> {error, health_check_failed, State} + case eredis_cluster:pool_exists(PoolName) of + true -> + Workers = extract_eredis_cluster_workers(PoolName), + case eredis_cluster_workers_exist_and_are_connected(Workers) of + true -> {ok, State}; + false -> {error, health_check_failed, State} + end; + + false -> + {error, health_check_failed, State} end; + + on_health_check(_InstId, #{poolname := PoolName} = State) -> emqx_plugin_libs_pool:health_check(PoolName, fun ?MODULE:do_health_check/1, State). diff --git a/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl new file mode 100644 index 000000000..6c0234bec --- /dev/null +++ b/apps/emqx_connector/test/emqx_connector_redis_SUITE.erl @@ -0,0 +1,139 @@ +% %%-------------------------------------------------------------------- +% %% Copyright (c) 2020-2022 EMQ Technologies Co., Ltd. All Rights Reserved. +% %% +% %% Licensed under the Apache License, Version 2.0 (the "License"); +% %% you may not use this file except in compliance with the License. +% %% You may obtain a copy of the License at +% %% http://www.apache.org/licenses/LICENSE-2.0 +% %% +% %% Unless required by applicable law or agreed to in writing, software +% %% distributed under the License is distributed on an "AS IS" BASIS, +% %% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +% %% See the License for the specific language governing permissions and +% %% limitations under the License. +% %%-------------------------------------------------------------------- + +-module(emqx_connector_redis_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include("emqx_connector.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include_lib("stdlib/include/assert.hrl"). + +-define(REDIS_HOST, "redis"). +-define(REDIS_PORT, 6379). +-define(REDIS_RESOURCE_MOD, emqx_connector_redis). + +all() -> + emqx_common_test_helpers:all(?MODULE). + +groups() -> + []. + +init_per_suite(Config) -> + case emqx_common_test_helpers:is_tcp_server_available(?REDIS_HOST, ?REDIS_PORT) of + true -> + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_connector]), + Config; + false -> + {skip, no_redis} + end. + +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([emqx_resource, emqx_connector]). + +init_per_testcase(_, Config) -> + Config. + +end_per_testcase(_, _Config) -> + ok. + +% %%------------------------------------------------------------------------------ +% %% Testcases +% %%------------------------------------------------------------------------------ + +t_single_lifecycle(_Config) -> + perform_lifecycle_check( + <<"emqx_connector_redis_SUITE_single">>, + redis_config_single(), + [<<"PING">>] + ). + +t_cluster_lifecycle(_Config) -> + perform_lifecycle_check( + <<"emqx_connector_redis_SUITE_cluster">>, + redis_config_cluster(), + [<<"PING">>, <<"PONG">>] + ). + +t_sentinel_lifecycle(_Config) -> + perform_lifecycle_check( + <<"emqx_connector_redis_SUITE_sentinel">>, + redis_config_sentinel(), + [<<"PING">>] + ). + +perform_lifecycle_check(PoolName, InitialConfig, RedisCommand) -> + {ok, #{config := CheckedConfig}} = emqx_resource:check_config(?REDIS_RESOURCE_MOD, InitialConfig), + {ok, #{state := #{poolname := ReturnedPoolName} = State, status := InitialStatus}} = emqx_resource:create_local( + PoolName, + ?CONNECTOR_RESOURCE_GROUP, + ?REDIS_RESOURCE_MOD, + CheckedConfig + ), + ?assertEqual(InitialStatus, started), + % Instance should match the state and status of the just started resource + {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := InitialStatus}} = emqx_resource:get_instance(PoolName), + ?assertEqual(ok, emqx_resource:health_check(PoolName)), + % Perform query as further check that the resource is working as expected + ?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})), + ?assertEqual(ok, emqx_resource:stop(PoolName)), + % Resource will be listed still, but state will be changed and healthcheck will fail + % as the worker no longer exists. + {ok, ?CONNECTOR_RESOURCE_GROUP, #{state := State, status := StoppedStatus}} = emqx_resource:get_instance(PoolName), + ?assertEqual(StoppedStatus, stopped), + ?assertEqual({error,health_check_failed}, emqx_resource:health_check(PoolName)), + % Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself. + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + % Can call stop/1 again on an already stopped instance + ?assertEqual(ok, emqx_resource:stop(PoolName)), + % Make sure it can be restarted and the healthchecks and queries work properly + ?assertEqual(ok, emqx_resource:restart(PoolName)), + {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName), + ?assertEqual(ok, emqx_resource:health_check(PoolName)), + ?assertEqual({ok, <<"PONG">>}, emqx_resource:query(PoolName, {cmd, RedisCommand})), + % Stop and remove the resource in one go. + ?assertEqual(ok, emqx_resource:remove_local(PoolName)), + ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), + % Should not even be able to get the resource data out of ets now unlike just stopping. + ?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)). + +% %%------------------------------------------------------------------------------ +% %% Helpers +% %%------------------------------------------------------------------------------ + +redis_config_single() -> + redis_config_base("single", "server"). + +redis_config_cluster() -> + redis_config_base("cluster", "servers"). + +redis_config_sentinel() -> + redis_config_base("sentinel", "servers"). + +redis_config_base(Type, ServerKey) -> + RawConfig = list_to_binary(io_lib:format(""" + auto_reconnect = true + database = 1 + pool_size = 8 + redis_type = ~s + password = public + ~s = \"~s:~b\" + """, [Type, ServerKey, ?REDIS_HOST, ?REDIS_PORT])), + + {ok, Config} = hocon:binary(RawConfig), + #{<<"config">> => Config}.