diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 6eccaaf09..e91cce600 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -291,12 +291,15 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> %% do not apply the callback (which is basically to bump success or fail counter) ok. +%% Note: since wolff client has its own replayq that is not managed by +%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, +%% `emqx_resource_manager' will kill the wolff producers and messages might be lost. on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) -> case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> do_get_status(Pid, KafkaTopic); {error, _Reason} -> - disconnected + connecting end. do_get_status(Client, KafkaTopic) -> @@ -315,10 +318,10 @@ do_get_status(Client, KafkaTopic) -> true -> connected; false -> - disconnected + connecting end; {error, _} -> - disconnected + connecting end. ssl(#{enable := true} = SSL) -> diff --git a/changes/ee/fix-11040.en.md b/changes/ee/fix-11040.en.md new file mode 100644 index 000000000..d01152b00 --- /dev/null +++ b/changes/ee/fix-11040.en.md @@ -0,0 +1 @@ +Fixed a health check issue for Kafka Producer that could lead to loss of messages when the connection to Kafka's brokers were down.