diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index 90be538b3..6169ef8ea 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -1,6 +1,6 @@ %% -*- mode: erlang; -*- {erl_opts, [debug_info]}. -{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.8.0"}}} +{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}} diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index 0c0c2eece..4f700e4cd 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -1,6 +1,6 @@ %% -*- mode: erlang; -*- {erl_opts, [debug_info]}. -{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.8.0"}}} +{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}} diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index b69ec1262..db7d619e1 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -1,6 +1,6 @@ %% -*- mode: erlang; -*- {erl_opts, [debug_info]}. -{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.8.0"}}} +{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}} diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index bf8c76bee..7caab1d87 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -539,7 +539,7 @@ check_topic_and_leader_connections(ClientId, KafkaTopic) -> kafka_client => ClientId, kafka_topic => KafkaTopic }); - {error, restarting} -> + {error, client_supervisor_not_initialized} -> throw(#{ reason => restarting, kafka_client => ClientId, @@ -620,16 +620,19 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> partition_count_refresh_interval := PCntRefreshInterval, max_inflight := MaxInflight, buffer := #{ - mode := BufferMode, + mode := BufferMode0, per_partition_limit := PerPartitionLimit, segment_bytes := SegmentBytes, - memory_overload_protection := MemOLP0 + memory_overload_protection := MemOLP } } = Input, - MemOLP = - case os:type() of - {unix, linux} -> MemOLP0; - _ -> false + %% avoid creating dirs for probing producers + BufferMode = + case IsDryRun of + true -> + memory; + false -> + BufferMode0 end, {OffloadMode, ReplayqDir} = case BufferMode of @@ -638,7 +641,6 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> hybrid -> {true, replayq_dir(BridgeType, BridgeName)} end, #{ - name => make_producer_name(BridgeType, BridgeName, IsDryRun), partitioner => partitioner(PartitionStrategy), partition_count_refresh_interval_seconds => PCntRefreshInterval, replayq_dir => ReplayqDir, @@ -669,18 +671,6 @@ replayq_dir(BridgeType, BridgeName) -> ]), filename:join([emqx:data_dir(), "kafka", DirName]). -%% Producer name must be an atom which will be used as a ETS table name for -%% partition worker lookup. -make_producer_name(_BridgeType, _BridgeName, true = _IsDryRun) -> - %% It is a dry run and we don't want to leak too many atoms - %% so we use the default producer name instead of creating - %% an unique name. - probing_wolff_producers; -make_producer_name(BridgeType, BridgeName, _IsDryRun) -> - %% Woff needs an atom for ets table name registration. The assumption here is - %% that bridges with new names are not often created. - binary_to_atom(iolist_to_binary([BridgeType, "_", bin(BridgeName)])). - with_log_at_error(Fun, Log) -> try Fun() diff --git a/changes/ee/feat-12194.en.md b/changes/ee/feat-12194.en.md new file mode 100644 index 000000000..6d65fc5bd --- /dev/null +++ b/changes/ee/feat-12194.en.md @@ -0,0 +1 @@ +Improve Kafka producer performance. diff --git a/mix.exs b/mix.exs index 415db4331..0e387884b 100644 --- a/mix.exs +++ b/mix.exs @@ -200,7 +200,7 @@ defmodule EMQXUmbrella.MixProject do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.8.0"}, + {:wolff, github: "kafka4beam/wolff", tag: "1.9.1"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.16.8"},