diff --git a/apps/emqx_bridge_azure_event_hub/rebar.config b/apps/emqx_bridge_azure_event_hub/rebar.config index 76e1ae0ef..f3be8f986 100644 --- a/apps/emqx_bridge_azure_event_hub/rebar.config +++ b/apps/emqx_bridge_azure_event_hub/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, diff --git a/apps/emqx_bridge_confluent/rebar.config b/apps/emqx_bridge_confluent/rebar.config index 5e4719106..21fff1a5e 100644 --- a/apps/emqx_bridge_confluent/rebar.config +++ b/apps/emqx_bridge_confluent/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, diff --git a/apps/emqx_bridge_kafka/rebar.config b/apps/emqx_bridge_kafka/rebar.config index e71ccea9f..b71efaf96 100644 --- a/apps/emqx_bridge_kafka/rebar.config +++ b/apps/emqx_bridge_kafka/rebar.config @@ -2,7 +2,7 @@ {erl_opts, [debug_info]}. {deps, [ - {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}}, + {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}}, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}}, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}}, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}}, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index b1032ff6b..be2c124e3 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -188,6 +188,7 @@ values(producer_values) -> ], kafka_header_value_encode_mode => none, max_inflight => 10, + partitions_limit => all_partitions, buffer => #{ mode => <<"hybrid">>, per_partition_limit => <<"2GB">>, @@ -414,6 +415,14 @@ fields(producer_kafka_opts) -> desc => ?DESC(partition_count_refresh_interval) } )}, + {partitions_limit, + mk( + hoconsc:union([all_partitions, pos_integer()]), + #{ + default => <<"all_partitions">>, + desc => ?DESC(partitions_limit) + } + )}, {max_inflight, mk( pos_integer(), diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl index 459e259d2..9071b807c 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_producer.erl @@ -135,6 +135,7 @@ create_producers_for_bridge_v2( KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)), KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])), KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none), + MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions), #{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id), TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX), IsDryRun = @@ -144,7 +145,7 @@ create_producers_for_bridge_v2( _ -> string:equal(TestIdStart, InstId) end, - ok = check_topic_and_leader_connections(ClientId, KafkaTopic), + ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), WolffProducerConfig = producers_config( BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id ), @@ -166,7 +167,8 @@ create_producers_for_bridge_v2( kafka_config => KafkaConfig, headers_tokens => KafkaHeadersTokens, ext_headers_tokens => KafkaExtHeadersTokens, - headers_val_encode_mode => KafkaHeadersValEncodeMode + headers_val_encode_mode => KafkaHeadersValEncodeMode, + partitions_limit => MaxPartitions }}; {error, Reason2} -> ?SLOG(error, #{ @@ -517,9 +519,9 @@ on_get_channel_status( %% `?status_disconnected' will make resource manager try to restart the producers / %% connector, thus potentially dropping data held in wolff producer's replayq. The %% only exception is if the topic does not exist ("unhealthy target"). - #{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels), + #{kafka_topic := KafkaTopic, partitions_limit := MaxPartitions} = maps:get(ChannelId, Channels), try - ok = check_topic_and_leader_connections(ClientId, KafkaTopic), + ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions), ?status_connected catch throw:{unhealthy_target, Msg} -> @@ -528,11 +530,11 @@ on_get_channel_status( {?status_connecting, {K, E}} end. -check_topic_and_leader_connections(ClientId, KafkaTopic) -> +check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) -> case wolff_client_sup:find_client(ClientId) of {ok, Pid} -> ok = check_topic_status(ClientId, Pid, KafkaTopic), - ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic); + ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions); {error, no_such_client} -> throw(#{ reason => cannot_find_kafka_client, @@ -562,9 +564,9 @@ check_client_connectivity(ClientId) -> {error, {find_client, Reason}} end. -check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid) -> +check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) -> Leaders = - case wolff_client:get_leader_connections(ClientPid, KafkaTopic) of + case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of {ok, LeadersToCheck} -> %% Kafka is considered healthy as long as any of the partition leader is reachable. lists:filtermap( @@ -584,7 +586,8 @@ check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid) throw(#{ error => no_connected_partition_leader, kafka_client => ClientId, - kafka_topic => KafkaTopic + kafka_topic => KafkaTopic, + partitions_limit => MaxPartitions }); _ -> ok @@ -619,6 +622,7 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> required_acks := RequiredAcks, partition_count_refresh_interval := PCntRefreshInterval, max_inflight := MaxInflight, + partitions_limit := MaxPartitions, buffer := #{ mode := BufferMode0, per_partition_limit := PerPartitionLimit, @@ -652,7 +656,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) -> max_batch_bytes => MaxBatchBytes, max_send_ahead => MaxInflight - 1, compression => Compression, - telemetry_meta_data => #{bridge_id => BridgeV2Id} + telemetry_meta_data => #{bridge_id => BridgeV2Id}, + max_partitions => MaxPartitions }. %% Wolff API is a batch API. diff --git a/changes/ee/feat-12427.en.md b/changes/ee/feat-12427.en.md new file mode 100644 index 000000000..c8f0b153c --- /dev/null +++ b/changes/ee/feat-12427.en.md @@ -0,0 +1 @@ +Made possible to limit the number of Kafka partitions to utilize for Kafka data integration. diff --git a/mix.exs b/mix.exs index f2d288740..9f591f61e 100644 --- a/mix.exs +++ b/mix.exs @@ -201,7 +201,7 @@ defmodule EMQXUmbrella.MixProject do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"}, {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true}, - {:wolff, github: "kafka4beam/wolff", tag: "1.9.1"}, + {:wolff, github: "kafka4beam/wolff", tag: "1.10.1"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, {:brod, github: "kafka4beam/brod", tag: "3.16.8"}, diff --git a/rel/i18n/emqx_bridge_azure_event_hub.hocon b/rel/i18n/emqx_bridge_azure_event_hub.hocon index 3d77a508a..6b06a480c 100644 --- a/rel/i18n/emqx_bridge_azure_event_hub.hocon +++ b/rel/i18n/emqx_bridge_azure_event_hub.hocon @@ -339,4 +339,12 @@ server_name_indication.desc: server_name_indication.label: """SNI""" +partitions_limit.desc: +"""Limit the number of partitions to produce data for the given topic. +The special value `all_partitions` is to utilize all partitions for the topic. +Setting this to a value which is greater than the total number of partitions in has no effect.""" + +partitions_limit.label: +"""Max Partitions""" + } diff --git a/rel/i18n/emqx_bridge_kafka.hocon b/rel/i18n/emqx_bridge_kafka.hocon index 86e417be1..b4bf6cd37 100644 --- a/rel/i18n/emqx_bridge_kafka.hocon +++ b/rel/i18n/emqx_bridge_kafka.hocon @@ -386,6 +386,14 @@ consumer_kafka_opts.desc: consumer_kafka_opts.label: """Kafka Consumer""" +partitions_limit.desc: +"""Limit the number of partitions to produce data for the given topic. +The special value `all_partitions` is to utilize all partitions for the topic. +Setting this to a value which is greater than the total number of partitions in has no effect.""" + +partitions_limit.label: +"""Max Partitions""" + max_inflight.desc: """Maximum number of batches allowed for Kafka producer (per-partition) to send before receiving acknowledgement from Kafka. Greater value typically means better throughput. However, there can be a risk of message reordering when this value is greater than 1."""