fix: code improvements thanks to comments from @thalesmg

This commit is contained in:
Kjell Winblad 2024-06-20 14:40:18 +02:00 committed by Thales Macedo Garitezi
parent 31509f02cc
commit 130571b56e
1 changed files with 18 additions and 25 deletions

View File

@ -20,6 +20,7 @@
-include_lib("hocon/include/hoconsc.hrl"). -include_lib("hocon/include/hoconsc.hrl").
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-export([namespace/0, roots/0, fields/1, redis_fields/0, desc/1]). -export([namespace/0, roots/0, fields/1, redis_fields/0, desc/1]).
@ -241,34 +242,26 @@ on_get_status(_InstId, #{type := cluster, pool_name := PoolName} = State) ->
%% In this case, we can directly consider it as a disconnect and then proceed to reconnect. %% In this case, we can directly consider it as a disconnect and then proceed to reconnect.
case eredis_cluster_monitor:get_all_pools(PoolName) of case eredis_cluster_monitor:get_all_pools(PoolName) of
[] -> [] ->
disconnected; ?status_disconnected;
[_ | _] -> [_ | _] ->
do_cluster_status_check(PoolName, State) do_cluster_status_check(PoolName, State)
end; end;
false -> false ->
disconnected ?status_disconnected
end; end;
on_get_status(_InstId, #{pool_name := PoolName} = State) -> on_get_status(_InstId, #{pool_name := PoolName} = State) ->
Timeout = 1000, HealthCheckResoults = emqx_resource_pool:health_check_workers(
Workers = [Worker || {_WorkerName, Worker} <- ecpool:workers(PoolName)], PoolName,
DoPerWorker = fun ?MODULE:do_get_status/1,
fun(Worker) -> emqx_resource_pool:health_check_timeout(),
case ecpool_worker:client(Worker) of #{return_values => true}
{ok, Conn} -> ),
erlang:is_process_alive(Conn) andalso case HealthCheckResoults of
ecpool_worker:exec(Worker, fun ?MODULE:do_get_status/1, Timeout); {ok, Results} ->
Error -> sum_worker_results(Results, State);
Error Error ->
end {?status_disconnected, State, Error}
end, end.
{ok, Results} =
try
{ok, emqx_utils:pmap(DoPerWorker, Workers, Timeout)}
catch
exit:timeout ->
{error, timeout}
end,
sum_worker_results(Results, State).
do_cluster_status_check(Pool, State) -> do_cluster_status_check(Pool, State) ->
Pongs = eredis_cluster:qa(Pool, [<<"PING">>]), Pongs = eredis_cluster:qa(Pool, [<<"PING">>]),
@ -278,11 +271,11 @@ do_get_status(Conn) ->
eredis:q(Conn, ["PING"]). eredis:q(Conn, ["PING"]).
sum_worker_results([], _State) -> sum_worker_results([], _State) ->
connected; ?status_connected;
sum_worker_results([{error, <<"NOAUTH Authentication required.">>} = Error | _Rest], State) -> sum_worker_results([{error, <<"NOAUTH Authentication required.">>} = Error | _Rest], State) ->
?tp(emqx_redis_auth_required_error, #{}), ?tp(emqx_redis_auth_required_error, #{}),
%% This requires user action to fix so we set the status to disconnected %% This requires user action to fix so we set the status to disconnected
{disconnected, State, Error}; {?status_disconnected, State, {unhealthy_target, Error}};
sum_worker_results([{ok, _} | Rest], State) -> sum_worker_results([{ok, _} | Rest], State) ->
sum_worker_results(Rest, State); sum_worker_results(Rest, State);
sum_worker_results([Error | _Rest], State) -> sum_worker_results([Error | _Rest], State) ->
@ -293,7 +286,7 @@ sum_worker_results([Error | _Rest], State) ->
error => Error error => Error
} }
), ),
{connecting, State, Error}. {?status_connecting, State, Error}.
do_cmd(PoolName, cluster, {cmd, Command}) -> do_cmd(PoolName, cluster, {cmd, Command}) ->
eredis_cluster:q(PoolName, Command); eredis_cluster:q(PoolName, Command);