fix(kafka): check Kafka partition leader connectivity

This commit is contained in:
Zaiming (Stone) Shi 2023-02-02 11:20:21 +01:00
parent 13ef30c46c
commit 0eb554a62e
1 changed files with 30 additions and 2 deletions

View File

@ -91,6 +91,7 @@ on_start(InstId, Config) ->
{ok, #{ {ok, #{
message_template => compile_message_template(MessageTemplate), message_template => compile_message_template(MessageTemplate),
client_id => ClientId, client_id => ClientId,
kafka_topic => KafkaTopic,
producers => Producers, producers => Producers,
resource_id => ResourceID resource_id => ResourceID
}}; }};
@ -234,8 +235,35 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) ->
%% do not apply the callback (which is basically to bump success or fail counter) %% do not apply the callback (which is basically to bump success or fail counter)
ok. ok.
on_get_status(_InstId, _State) -> on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) ->
connected. case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
do_get_status(Pid, KafkaTopic);
{error, _Reason} ->
disconnected
end.
do_get_status(Client, KafkaTopic) ->
%% TODO: add a wolff_producers:check_connectivity
case wolff_client:get_leader_connections(Client, KafkaTopic) of
{ok, Leaders} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable
case
lists:any(
fun({_Partition, Pid}) ->
is_pid(Pid) andalso erlang:is_process_alive(Pid)
end,
Leaders
)
of
true ->
connected;
false ->
disconnected
end;
{error, _} ->
disconnected
end.
%% Parse comma separated host:port list into a [{Host,Port}] list %% Parse comma separated host:port list into a [{Host,Port}] list
hosts(Hosts) when is_binary(Hosts) -> hosts(Hosts) when is_binary(Hosts) ->