chore: improve Kafka producer health-check error logs

The Kafka producer lib wolff returns detailed error logs
about which host:port had issue, EMQX should log them to help
troubleshooting
This commit is contained in:
zmstone 2024-05-18 18:44:10 +02:00
parent c3bc3cc514
commit 331d44a78a
2 changed files with 46 additions and 25 deletions

View File

@ -572,33 +572,54 @@ check_client_connectivity(ClientId) ->
{error, {find_client, Reason}}
end.
is_alive(Pid) ->
is_pid(Pid) andalso erlang:is_process_alive(Pid).
error_summary(Map, [Error]) ->
Map#{error => Error};
error_summary(Map, [Error | More]) ->
Map#{first_error => Error, total_errors => length(More) + 1}.
check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
Leaders =
case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
{ok, LeadersToCheck} ->
{ok, Leaders} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable.
lists:filtermap(
fun({_Partition, Pid}) ->
case is_pid(Pid) andalso erlang:is_process_alive(Pid) of
true -> {true, Pid};
_ -> false
end
end,
LeadersToCheck
);
{error, _} ->
[]
end,
case Leaders of
[] ->
throw(#{
error => no_connected_partition_leader,
case lists:partition(fun({_Partition, Pid}) -> is_alive(Pid) end, Leaders) of
{[], Errors} ->
throw(
error_summary(
#{
cause => "no_connected_partition_leader",
kafka_client => ClientId,
kafka_topic => KafkaTopic,
partitions_limit => MaxPartitions
});
_ ->
kafka_topic => KafkaTopic
},
Errors
)
);
{_, []} ->
ok;
{_, Errors} ->
?SLOG(
warning,
"not_all_kafka_partitions_connected",
error_summary(
#{
kafka_client => ClientId,
kafka_topic => KafkaTopic
},
Errors
)
),
ok
end;
{error, Reason} ->
%% If failed to fetch metadata, wolff_client logs a warning level message
%% which includes the reason for each seed host
throw(#{
cause => Reason,
kafka_client => ClientId,
kafka_topic => KafkaTopic
})
end.
check_topic_status(ClientId, WolffClientPid, KafkaTopic) ->

View File

@ -245,7 +245,7 @@ t_license_setting_bc(_Config) ->
?assertMatch(#{<<"max_connections">> := 25}, request_dump()),
%% get
GetRes = request(get, uri(["license", "setting"]), []),
%% aslo check that the settings return correctly
%% also check that the settings return correctly
validate_setting(GetRes, <<"75%">>, <<"80%">>, 25),
%% update
Low = <<"50%">>,