chore: upgrade kafka producer lib wolff to 1.10.4
wolff-1.10.4 handles message_too_large error code from Kafka. It tries to split the too-large batch into single-call batches (for EMQX, one call batch is always one message), and retry. In case a single-call batch is too large, EMQX should increment a failure counter.
This commit is contained in:
parent
460081a22e
commit
a00d9f17b1
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
{erl_opts, [debug_info]}.
|
{erl_opts, [debug_info]}.
|
||||||
{deps, [
|
{deps, [
|
||||||
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}},
|
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
|
||||||
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
|
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
|
||||||
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
|
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
|
||||||
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
|
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
{erl_opts, [debug_info]}.
|
{erl_opts, [debug_info]}.
|
||||||
{deps, [
|
{deps, [
|
||||||
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}},
|
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
|
||||||
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
|
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
|
||||||
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
|
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
|
||||||
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
|
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
|
||||||
|
|
|
@ -2,7 +2,7 @@
|
||||||
|
|
||||||
{erl_opts, [debug_info]}.
|
{erl_opts, [debug_info]}.
|
||||||
{deps, [
|
{deps, [
|
||||||
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.2"}}},
|
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.4"}}},
|
||||||
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
|
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.5"}}},
|
||||||
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
|
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
|
||||||
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
|
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},
|
||||||
|
|
|
@ -211,7 +211,7 @@ ensure_client(ClientId, Hosts, ClientConfig) ->
|
||||||
case wolff_client_sup:find_client(ClientId) of
|
case wolff_client_sup:find_client(ClientId) of
|
||||||
{ok, _Pid} ->
|
{ok, _Pid} ->
|
||||||
ok;
|
ok;
|
||||||
{error, no_such_client} ->
|
{error, #{reason := no_such_client}} ->
|
||||||
case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
|
case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
|
||||||
{ok, _} ->
|
{ok, _} ->
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{
|
||||||
|
@ -543,13 +543,13 @@ check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) ->
|
||||||
{ok, Pid} ->
|
{ok, Pid} ->
|
||||||
ok = check_topic_status(ClientId, Pid, KafkaTopic),
|
ok = check_topic_status(ClientId, Pid, KafkaTopic),
|
||||||
ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions);
|
ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions);
|
||||||
{error, no_such_client} ->
|
{error, #{reason := no_such_client}} ->
|
||||||
throw(#{
|
throw(#{
|
||||||
reason => cannot_find_kafka_client,
|
reason => cannot_find_kafka_client,
|
||||||
kafka_client => ClientId,
|
kafka_client => ClientId,
|
||||||
kafka_topic => KafkaTopic
|
kafka_topic => KafkaTopic
|
||||||
});
|
});
|
||||||
{error, client_supervisor_not_initialized} ->
|
{error, #{reason := client_supervisor_not_initialized}} ->
|
||||||
throw(#{
|
throw(#{
|
||||||
reason => restarting,
|
reason => restarting,
|
||||||
kafka_client => ClientId,
|
kafka_client => ClientId,
|
||||||
|
|
|
@ -0,0 +1,6 @@
|
||||||
|
Improve Kafka producer error handling for `message_too_large`.
|
||||||
|
|
||||||
|
Prior to this change, Kafka producers would retry sending oversized batches (`message_too_large` error) in hopes of a server side configuration fix (`max.message.bytes`).
|
||||||
|
|
||||||
|
Now, oversized messages are automatically split into single-message batches for retry.
|
||||||
|
If a message still exceeds size limits, it will be dropped to maintain data flow.
|
2
mix.exs
2
mix.exs
|
@ -210,7 +210,7 @@ defmodule EMQXUmbrella.MixProject do
|
||||||
{:hstreamdb_erl,
|
{:hstreamdb_erl,
|
||||||
github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
|
github: "hstreamdb/hstreamdb_erl", tag: "0.5.18+v0.18.1+ezstd-v1.0.5-emqx1"},
|
||||||
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
|
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
|
||||||
{:wolff, github: "kafka4beam/wolff", tag: "1.10.2"},
|
{:wolff, github: "kafka4beam/wolff", tag: "1.10.4"},
|
||||||
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
|
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.5", override: true},
|
||||||
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
|
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
|
||||||
{:brod, github: "kafka4beam/brod", tag: "3.16.8"},
|
{:brod, github: "kafka4beam/brod", tag: "3.16.8"},
|
||||||
|
|
Loading…
Reference in New Issue