diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index 269239620..8cd5ee427 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index 0519e39c9..a969ac83b 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index 7c98bf571..fd7f03da8 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index fa7bc67ac..53f2603d4 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -211,7 +211,7 @@ ensure_client(ClientId, Hosts, ClientConfig) -> case wolff_client_sup:find_client(ClientId) of {ok, _Pid} -> ok; - {error, no_such_client} -> + {error, #{reason := no_such_client}} -> case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of {ok, _} -> ?SLOG(info, #{ @@ -543,13 +543,13 @@ check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) -> {ok, Pid} -> ok = check_topic_status(ClientId, Pid, KafkaTopic), ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions); - {error, no_such_client} -> + {error, #{reason := no_such_client}} -> throw(#{ reason => cannot_find_kafka_client, kafka_client => ClientId, kafka_topic => KafkaTopic }); - {error, client_supervisor_not_initialized} -> + {error, #{reason := client_supervisor_not_initialized}} -> throw(#{ reason => restarting, kafka_client => ClientId, diff --git a/changes/ee/fix-13079.en.md b/changes/ee/fix-13079.en.md new file mode 100644 index 000000000..c469bdf6e --- /dev/null +++ b/changes/ee/fix-13079.en.md @@ -0,0 +1,6 @@ +Improve Kafka producer error handling for `message_too_large`. + +Prior to this change, Kafka producers would retry sending oversized batches (`message_too_large` error) in hopes of a server side configuration fix (`max.message.bytes`). + +Now, oversized messages are automatically split into single-message batches for retry. +If a message still exceeds size limits, it will be dropped to maintain data flow. diff --git a/mix.exs b/mix.exs index f3e91096e..85aa88d62 100644 --- a/mix.exs +++ b/mix.exs @@ -210,7 +210,7 @@ defmodule EMQXUmbrella.MixProject do {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.10.2"}, + {:wolff, github: "kafka4beam/wolff", tag: "1.10.4"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.16.8"},