From 8e59319bfe255e617c54b9d778c6ab4f920a9f61 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 2 Jan 2023 15:04:53 -0300 Subject: [PATCH] fix(kafka_producer): fix message loss when kafka connection is down --- .../kafka/emqx_bridge_impl_kafka_producer.erl | 20 +++++++++++++++---- 1 file changed, 16 insertions(+), 4 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 6b46de35e..666ab3f8e 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -176,10 +176,22 @@ on_kafka_ack(_Partition, _Offset, _Extra) -> %% Maybe need to bump some counters? ok. -on_get_status(_InstId, #{client_id := ClientID}) -> - case wolff:check_connectivity(ClientID) of - ok -> connected; - _ -> disconnected +on_get_status(_InstId, #{producers := Producers}) -> + %% Just to pick some producer. + RandomVal = emqx_guid:gen(), + FakeMsg = [#{key => RandomVal, value => RandomVal}], + %% Note: we must not check the connectivity to Kafka itself, but + %% only if there are producers available. Otherwise, if Kafka + %% goes down, the resource will be considered down and messages + %% *will not* be sent to the wolff producers and buffered, + %% effectively losing the message as there are no buffer workers + %% for Kafka producer. + try wolff_producers:pick_producer(Producers, FakeMsg) of + {_Partition, _Pid} -> + connected + catch + error:{producer_down, _} -> + disconnected end. %% Parse comma separated host:port list into a [{Host,Port}] list