feat(kafka_producer): add partitions_limit

This commit is contained in:
Zaiming (Stone) Shi 2024-01-30 12:05:06 +01:00
parent 53c217c383
commit 3b42a7425b
9 changed files with 45 additions and 14 deletions

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}. {erl_opts, [debug_info]}.
{deps, [ {deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}}, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -188,6 +188,7 @@ values(producer_values) ->
], ],
kafka_header_value_encode_mode => none, kafka_header_value_encode_mode => none,
max_inflight => 10, max_inflight => 10,
partitions_limit => all_partitions,
buffer => #{ buffer => #{
mode => <<"hybrid">>, mode => <<"hybrid">>,
per_partition_limit => <<"2GB">>, per_partition_limit => <<"2GB">>,
@ -414,6 +415,14 @@ fields(producer_kafka_opts) ->
desc => ?DESC(partition_count_refresh_interval) desc => ?DESC(partition_count_refresh_interval)
} }
)}, )},
{partitions_limit,
mk(
hoconsc:union([all_partitions, pos_integer()]),
#{
default => <<"all_partitions">>,
desc => ?DESC(partitions_limit)
}
)},
{max_inflight, {max_inflight,
mk( mk(
pos_integer(), pos_integer(),

View File

@ -135,6 +135,7 @@ create_producers_for_bridge_v2(
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions),
#{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id),
TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX), TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX),
IsDryRun = IsDryRun =
@ -144,7 +145,7 @@ create_producers_for_bridge_v2(
_ -> _ ->
string:equal(TestIdStart, InstId) string:equal(TestIdStart, InstId)
end, end,
ok = check_topic_and_leader_connections(ClientId, KafkaTopic), ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
WolffProducerConfig = producers_config( WolffProducerConfig = producers_config(
BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id
), ),
@ -166,7 +167,8 @@ create_producers_for_bridge_v2(
kafka_config => KafkaConfig, kafka_config => KafkaConfig,
headers_tokens => KafkaHeadersTokens, headers_tokens => KafkaHeadersTokens,
ext_headers_tokens => KafkaExtHeadersTokens, ext_headers_tokens => KafkaExtHeadersTokens,
headers_val_encode_mode => KafkaHeadersValEncodeMode headers_val_encode_mode => KafkaHeadersValEncodeMode,
partitions_limit => MaxPartitions
}}; }};
{error, Reason2} -> {error, Reason2} ->
?SLOG(error, #{ ?SLOG(error, #{
@ -517,9 +519,9 @@ on_get_channel_status(
%% `?status_disconnected' will make resource manager try to restart the producers / %% `?status_disconnected' will make resource manager try to restart the producers /
%% connector, thus potentially dropping data held in wolff producer's replayq. The %% connector, thus potentially dropping data held in wolff producer's replayq. The
%% only exception is if the topic does not exist ("unhealthy target"). %% only exception is if the topic does not exist ("unhealthy target").
#{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels), #{kafka_topic := KafkaTopic, partitions_limit := MaxPartitions} = maps:get(ChannelId, Channels),
try try
ok = check_topic_and_leader_connections(ClientId, KafkaTopic), ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
?status_connected ?status_connected
catch catch
throw:{unhealthy_target, Msg} -> throw:{unhealthy_target, Msg} ->
@ -528,11 +530,11 @@ on_get_channel_status(
{?status_connecting, {K, E}} {?status_connecting, {K, E}}
end. end.
check_topic_and_leader_connections(ClientId, KafkaTopic) -> check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) ->
case wolff_client_sup:find_client(ClientId) of case wolff_client_sup:find_client(ClientId) of
{ok, Pid} -> {ok, Pid} ->
ok = check_topic_status(ClientId, Pid, KafkaTopic), ok = check_topic_status(ClientId, Pid, KafkaTopic),
ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic); ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions);
{error, no_such_client} -> {error, no_such_client} ->
throw(#{ throw(#{
reason => cannot_find_kafka_client, reason => cannot_find_kafka_client,
@ -562,9 +564,9 @@ check_client_connectivity(ClientId) ->
{error, {find_client, Reason}} {error, {find_client, Reason}}
end. end.
check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid) -> check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
Leaders = Leaders =
case wolff_client:get_leader_connections(ClientPid, KafkaTopic) of case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
{ok, LeadersToCheck} -> {ok, LeadersToCheck} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable. %% Kafka is considered healthy as long as any of the partition leader is reachable.
lists:filtermap( lists:filtermap(
@ -584,7 +586,8 @@ check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid)
throw(#{ throw(#{
error => no_connected_partition_leader, error => no_connected_partition_leader,
kafka_client => ClientId, kafka_client => ClientId,
kafka_topic => KafkaTopic kafka_topic => KafkaTopic,
partitions_limit => MaxPartitions
}); });
_ -> _ ->
ok ok
@ -619,6 +622,7 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
required_acks := RequiredAcks, required_acks := RequiredAcks,
partition_count_refresh_interval := PCntRefreshInterval, partition_count_refresh_interval := PCntRefreshInterval,
max_inflight := MaxInflight, max_inflight := MaxInflight,
partitions_limit := MaxPartitions,
buffer := #{ buffer := #{
mode := BufferMode0, mode := BufferMode0,
per_partition_limit := PerPartitionLimit, per_partition_limit := PerPartitionLimit,
@ -652,7 +656,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
max_batch_bytes => MaxBatchBytes, max_batch_bytes => MaxBatchBytes,
max_send_ahead => MaxInflight - 1, max_send_ahead => MaxInflight - 1,
compression => Compression, compression => Compression,
telemetry_meta_data => #{bridge_id => BridgeV2Id} telemetry_meta_data => #{bridge_id => BridgeV2Id},
max_partitions => MaxPartitions
}. }.
%% Wolff API is a batch API. %% Wolff API is a batch API.

View File

@ -0,0 +1 @@
Made possible to limit the number of Kafka partitions to utilize for Kafka data integration.

View File

@ -201,7 +201,7 @@ defmodule EMQXUmbrella.MixProject do
[ [
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"}, {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.9.1"}, {:wolff, github: "kafka4beam/wolff", tag: "1.10.1"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.16.8"}, {:brod, github: "kafka4beam/brod", tag: "3.16.8"},

View File

@ -339,4 +339,12 @@ server_name_indication.desc:
server_name_indication.label: server_name_indication.label:
"""SNI""" """SNI"""
partitions_limit.desc:
"""Limit the number of partitions to produce data for the given topic.
The special value `all_partitions` is to utilize all partitions for the topic.
Setting this to a value which is greater than the total number of partitions in has no effect."""
partitions_limit.label:
"""Max Partitions"""
} }

View File

@ -386,6 +386,14 @@ consumer_kafka_opts.desc:
consumer_kafka_opts.label: consumer_kafka_opts.label:
"""Kafka Consumer""" """Kafka Consumer"""
partitions_limit.desc:
"""Limit the number of partitions to produce data for the given topic.
The special value `all_partitions` is to utilize all partitions for the topic.
Setting this to a value which is greater than the total number of partitions in has no effect."""
partitions_limit.label:
"""Max Partitions"""
max_inflight.desc: max_inflight.desc:
"""Maximum number of batches allowed for Kafka producer (per-partition) to send before receiving acknowledgement from Kafka. Greater value typically means better throughput. However, there can be a risk of message reordering when this value is greater than 1.""" """Maximum number of batches allowed for Kafka producer (per-partition) to send before receiving acknowledgement from Kafka. Greater value typically means better throughput. However, there can be a risk of message reordering when this value is greater than 1."""