fix: redis connector should not timeout because no username and password

A redis connector of type single or sentinel always got a timeout error
when doing the connector test in the dashboard if no username or
password was provided. This commit makes sure that the user instead get
an informative error message. Additionally, this commit adds more
more error information for all redis connector types.

Fixes:
https://emqx.atlassian.net/browse/EMQX-12557
This commit is contained in:
Kjell Winblad 2024-06-20 11:26:53 +02:00 committed by Thales Macedo Garitezi
parent 14e2ed7be1
commit 6190192cbc
4 changed files with 89 additions and 15 deletions

View File

@ -945,6 +945,7 @@ t_on_get_status(Config, Opts) ->
ProxyHost = ?config(proxy_host, Config),
ProxyName = ?config(proxy_name, Config),
FailureStatus = maps:get(failure_status, Opts, disconnected),
NormalStatus = maps:get(normal_status, Opts, connected),
?assertMatch({ok, _}, create_bridge(Config)),
ResourceId = resource_id(Config),
%% Since the connection process is async, we give it some time to
@ -952,7 +953,7 @@ t_on_get_status(Config, Opts) ->
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
?assertEqual({ok, NormalStatus}, emqx_resource_manager:health_check(ResourceId))
),
case ProxyHost of
undefined ->
@ -971,7 +972,7 @@ t_on_get_status(Config, Opts) ->
?retry(
_Sleep = 1_000,
_Attempts = 20,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
?assertEqual({ok, NormalStatus}, emqx_resource_manager:health_check(ResourceId))
)
end,
ok.

View File

@ -19,6 +19,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-define(BRIDGE_TYPE, redis).
-define(BRIDGE_TYPE_BIN, <<"redis">>).
@ -46,6 +47,7 @@ matrix_testcases() ->
t_start_stop,
t_create_via_http,
t_on_get_status,
t_on_get_status_no_username_pass,
t_sync_query,
t_map_to_redis_hset_args
].
@ -325,6 +327,43 @@ t_on_get_status(Config) when is_list(Config) ->
emqx_bridge_v2_testlib:t_on_get_status(Config, #{failure_status => connecting}),
ok.
t_on_get_status_no_username_pass(matrix) ->
{on_get_status, [
[single, tcp],
[cluster, tcp],
[sentinel, tcp]
]};
t_on_get_status_no_username_pass(Config0) when is_list(Config0) ->
ConnectorConfig0 = ?config(connector_config, Config0),
ConnectorConfig1 = emqx_utils_maps:deep_put(
[<<"parameters">>, <<"password">>], ConnectorConfig0, <<"">>
),
ConnectorConfig2 = emqx_utils_maps:deep_put(
[<<"parameters">>, <<"username">>], ConnectorConfig1, <<"">>
),
Config1 = proplists:delete(connector_config, Config0),
Config2 = [{connector_config, ConnectorConfig2} | Config1],
?check_trace(
emqx_bridge_v2_testlib:t_on_get_status(
Config2,
#{
failure_status => disconnected,
normal_status => disconnected
}
),
fun(ok, Trace) ->
case ?config(redis_type, Config2) of
single ->
?assertMatch([_ | _], ?of_kind(emqx_redis_auth_required_error, Trace));
sentinel ->
?assertMatch([_ | _], ?of_kind(emqx_redis_auth_required_error, Trace));
cluster ->
ok
end
end
),
ok.
t_sync_query(matrix) ->
{sync_query, [
[single, tcp],

View File

@ -1,6 +1,6 @@
{application, emqx_redis, [
{description, "EMQX Redis Database Connector"},
{vsn, "0.1.5"},
{vsn, "0.1.6"},
{registered, []},
{applications, [
kernel,

View File

@ -19,6 +19,7 @@
-include_lib("typerefl/include/types.hrl").
-include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-export([namespace/0, roots/0, fields/1, redis_fields/0, desc/1]).
@ -231,7 +232,7 @@ is_unrecoverable_error({error, invalid_cluster_command}) ->
is_unrecoverable_error(_) ->
false.
on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) ->
on_get_status(_InstId, #{type := cluster, pool_name := PoolName} = State) ->
case eredis_cluster:pool_exists(PoolName) of
true ->
%% eredis_cluster has null slot even pool_exists when emqx start before redis cluster.
@ -242,24 +243,57 @@ on_get_status(_InstId, #{type := cluster, pool_name := PoolName}) ->
[] ->
disconnected;
[_ | _] ->
Health = eredis_cluster:ping_all(PoolName),
status_result(Health)
do_cluster_status_check(PoolName, State)
end;
false ->
disconnected
end;
on_get_status(_InstId, #{pool_name := PoolName}) ->
Health = emqx_resource_pool:health_check_workers(PoolName, fun ?MODULE:do_get_status/1),
status_result(Health).
on_get_status(_InstId, #{pool_name := PoolName} = State) ->
Timeout = 1000,
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)],
DoPerWorker =
fun(Worker) ->
case ecpool_worker:client(Worker) of
{ok, Conn} ->
erlang:is_process_alive(Conn) andalso
ecpool_worker:exec(Worker, fun ?MODULE:do_get_status/1, Timeout);
Error ->
Error
end
end,
{ok, Results} =
try
{ok, emqx_utils:pmap(DoPerWorker, Workers, Timeout)}
catch
exit:timeout ->
{error, timeout}
end,
sum_worker_results(Results, State).
do_cluster_status_check(Pool, State) ->
Pongs = eredis_cluster:qa(Pool, [<<"PING">>]),
sum_worker_results(Pongs, State).
do_get_status(Conn) ->
case eredis:q(Conn, ["PING"]) of
{ok, _} -> true;
_ -> false
end.
eredis:q(Conn, ["PING"]).
status_result(_Status = true) -> connected;
status_result(_Status = false) -> connecting.
sum_worker_results([], _State) ->
connected;
sum_worker_results([{error, <<"NOAUTH Authentication required.">>} = Error | _Rest], State) ->
?tp(emqx_redis_auth_required_error, #{}),
%% This requires user action to fix so we set the status to disconnected
{disconnected, State, Error};
sum_worker_results([{ok, _} | Rest], State) ->
sum_worker_results(Rest, State);
sum_worker_results([Error | _Rest], State) ->
?SLOG(
warning,
#{
msg => "emqx_redis_check_status_error",
error => Error
}
),
{connecting, State, Error}.
do_cmd(PoolName, cluster, {cmd, Command}) ->
eredis_cluster:q(PoolName, Command);