From 0eb554a62e9f987779dcb5600ad05daf558dff4b Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Thu, 2 Feb 2023 11:20:21 +0100 Subject: [PATCH] fix(kafka): check Kafka partition leader connectivity --- .../kafka/emqx_bridge_impl_kafka_producer.erl | 32 +++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 1ac619626..ac98209ed 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -91,6 +91,7 @@ on_start(InstId, Config) -> {ok, #{ message_template => compile_message_template(MessageTemplate), client_id => ClientId, + kafka_topic => KafkaTopic, producers => Producers, resource_id => ResourceID }}; @@ -234,8 +235,35 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> %% do not apply the callback (which is basically to bump success or fail counter) ok. -on_get_status(_InstId, _State) -> - connected. +on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) -> + case wolff_client_sup:find_client(ClientId) of + {ok, Pid} -> + do_get_status(Pid, KafkaTopic); + {error, _Reason} -> + disconnected + end. + +do_get_status(Client, KafkaTopic) -> + %% TODO: add a wolff_producers:check_connectivity + case wolff_client:get_leader_connections(Client, KafkaTopic) of + {ok, Leaders} -> + %% Kafka is considered healthy as long as any of the partition leader is reachable + case + lists:any( + fun({_Partition, Pid}) -> + is_pid(Pid) andalso erlang:is_process_alive(Pid) + end, + Leaders + ) + of + true -> + connected; + false -> + disconnected + end; + {error, _} -> + disconnected + end. %% Parse comma separated host:port list into a [{Host,Port}] list hosts(Hosts) when is_binary(Hosts) ->