From 25b0e310350da9c614951dec32ed79b6694ef7c2 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 13 Jun 2023 13:50:42 -0300 Subject: [PATCH] fix(kafka_producer): do not return disconnected when checking status (r5.1) Fixes https://emqx.atlassian.net/browse/EMQX-10279 Related: https://github.com/emqx/emqx/pull/11038 Since wolff client has its own replayq that lives outside the management of the buffer workers, we must not return disconnected status for such bridge: otherwise, the resource manager will eventually kill the producers and data may be lost. --- .../src/emqx_bridge_kafka_impl_producer.erl | 9 ++++++--- changes/ee/fix-11040.en.md | 1 + 2 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 changes/ee/fix-11040.en.md diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 6eccaaf09..e91cce600 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -291,12 +291,15 @@ on_kafka_ack(_Partition, buffer_overflow_discarded, _Callback) -> %% do not apply the callback (which is basically to bump success or fail counter) ok. +%% Note: since wolff client has its own replayq that is not managed by +%% `emqx_resource_buffer_worker', we must avoid returning `disconnected' here. Otherwise, +%% `emqx_resource_manager' will kill the wolff producers and messages might be lost. on_get_status(_InstId, #{client_id := ClientId, kafka_topic := KafkaTopic}) -> case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> do_get_status(Pid, KafkaTopic); {error, _Reason} -> - disconnected + connecting end. do_get_status(Client, KafkaTopic) -> @@ -315,10 +318,10 @@ do_get_status(Client, KafkaTopic) -> true -> connected; false -> - disconnected + connecting end; {error, _} -> - disconnected + connecting end. ssl(#{enable := true} = SSL) -> diff --git a/changes/ee/fix-11040.en.md b/changes/ee/fix-11040.en.md new file mode 100644 index 000000000..d01152b00 --- /dev/null +++ b/changes/ee/fix-11040.en.md @@ -0,0 +1 @@ +Fixed a health check issue for Kafka Producer that could lead to loss of messages when the connection to Kafka's brokers were down.