diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl index 6b46de35e..666ab3f8e 100644 --- a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -176,10 +176,22 @@ on_kafka_ack(_Partition, _Offset, _Extra) -> %% Maybe need to bump some counters? ok. -on_get_status(_InstId, #{client_id := ClientID}) -> - case wolff:check_connectivity(ClientID) of - ok -> connected; - _ -> disconnected +on_get_status(_InstId, #{producers := Producers}) -> + %% Just to pick some producer. + RandomVal = emqx_guid:gen(), + FakeMsg = [#{key => RandomVal, value => RandomVal}], + %% Note: we must not check the connectivity to Kafka itself, but + %% only if there are producers available. Otherwise, if Kafka + %% goes down, the resource will be considered down and messages + %% *will not* be sent to the wolff producers and buffered, + %% effectively losing the message as there are no buffer workers + %% for Kafka producer. + try wolff_producers:pick_producer(Producers, FakeMsg) of + {_Partition, _Pid} -> + connected + catch + error:{producer_down, _} -> + disconnected end. %% Parse comma separated host:port list into a [{Host,Port}] list