perf: upgrade to wolff 1.9.1 for better performance

Upgrade from 1.8.0:
- wollf-1.9.0 has the global stats disabled by default
- wolff-1.9.1 improved client pid lookup performance
This commit is contained in:
Zaiming (Stone) Shi 2023-12-18 18:17:50 +01:00
parent e9a91881cf
commit 7338e394c8
6 changed files with 15 additions and 24 deletions

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang; -*- %% -*- mode: erlang; -*-
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.8.0"}}} {deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}}
, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}
, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}
, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang; -*- %% -*- mode: erlang; -*-
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.8.0"}}} {deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}}
, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}
, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}
, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}

View File

@ -1,6 +1,6 @@
%% -*- mode: erlang; -*- %% -*- mode: erlang; -*-
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.8.0"}}} {deps, [ {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}}
, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}} , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}
, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}} , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}
, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}} , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}

View File

@ -539,7 +539,7 @@ check_topic_and_leader_connections(ClientId, KafkaTopic) ->
kafka_client => ClientId, kafka_client => ClientId,
kafka_topic => KafkaTopic kafka_topic => KafkaTopic
}); });
{error, restarting} -> {error, client_supervisor_not_initialized} ->
throw(#{ throw(#{
reason => restarting, reason => restarting,
kafka_client => ClientId, kafka_client => ClientId,
@ -620,16 +620,19 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
partition_count_refresh_interval := PCntRefreshInterval, partition_count_refresh_interval := PCntRefreshInterval,
max_inflight := MaxInflight, max_inflight := MaxInflight,
buffer := #{ buffer := #{
mode := BufferMode, mode := BufferMode0,
per_partition_limit := PerPartitionLimit, per_partition_limit := PerPartitionLimit,
segment_bytes := SegmentBytes, segment_bytes := SegmentBytes,
memory_overload_protection := MemOLP0 memory_overload_protection := MemOLP
} }
} = Input, } = Input,
MemOLP = %% avoid creating dirs for probing producers
case os:type() of BufferMode =
{unix, linux} -> MemOLP0; case IsDryRun of
_ -> false true ->
memory;
false ->
BufferMode0
end, end,
{OffloadMode, ReplayqDir} = {OffloadMode, ReplayqDir} =
case BufferMode of case BufferMode of
@ -638,7 +641,6 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
hybrid -> {true, replayq_dir(BridgeType, BridgeName)} hybrid -> {true, replayq_dir(BridgeType, BridgeName)}
end, end,
#{ #{
name => make_producer_name(BridgeType, BridgeName, IsDryRun),
partitioner => partitioner(PartitionStrategy), partitioner => partitioner(PartitionStrategy),
partition_count_refresh_interval_seconds => PCntRefreshInterval, partition_count_refresh_interval_seconds => PCntRefreshInterval,
replayq_dir => ReplayqDir, replayq_dir => ReplayqDir,
@ -669,18 +671,6 @@ replayq_dir(BridgeType, BridgeName) ->
]), ]),
filename:join([emqx:data_dir(), "kafka", DirName]). filename:join([emqx:data_dir(), "kafka", DirName]).
%% Producer name must be an atom which will be used as a ETS table name for
%% partition worker lookup.
make_producer_name(_BridgeType, _BridgeName, true = _IsDryRun) ->
%% It is a dry run and we don't want to leak too many atoms
%% so we use the default producer name instead of creating
%% an unique name.
probing_wolff_producers;
make_producer_name(BridgeType, BridgeName, _IsDryRun) ->
%% Woff needs an atom for ets table name registration. The assumption here is
%% that bridges with new names are not often created.
binary_to_atom(iolist_to_binary([BridgeType, "_", bin(BridgeName)])).
with_log_at_error(Fun, Log) -> with_log_at_error(Fun, Log) ->
try try
Fun() Fun()

View File

@ -0,0 +1 @@
Improve Kafka producer performance.

View File

@ -200,7 +200,7 @@ defmodule EMQXUmbrella.MixProject do
[ [
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"}, {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.8.0"}, {:wolff, github: "kafka4beam/wolff", tag: "1.9.1"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.16.8"}, {:brod, github: "kafka4beam/brod", tag: "3.16.8"},