Merge pull request #12427 from zmstone/0130-limit-kafka-partitions

0130 limit kafka partitions
This commit is contained in:
Zaiming (Stone) Shi 2024-02-01 14:39:52 +01:00 committed by GitHub
commit 3e518c1876
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 58 additions and 22 deletions

View File

@ -257,7 +257,6 @@ t_session_unsubscription_idempotency(Config) ->
?check_trace(
#{timetrap => 30_000},
begin
#{timetrap => 20_000},
?force_ordering(
#{
?snk_kind := persistent_session_ds_subscription_delete
@ -498,9 +497,7 @@ do_t_session_expiration(_Config, Opts) ->
ok.
t_session_gc(Config) ->
GCInterval = ?config(gc_interval, Config),
[Node1, Node2, _Node3] = Nodes = ?config(nodes, Config),
CoreNodes = [Node1, Node2],
[
Port1,
Port2,

View File

@ -85,13 +85,13 @@
%% Guards
-define(IS_SUBID(Id), (is_binary(Id) orelse is_atom(Id))).
-define(cast_or_eval(Pid, Msg, Expr),
case Pid =:= self() of
true ->
-define(cast_or_eval(PICK, Msg, Expr),
case PICK of
__X_Pid when __X_Pid =:= self() ->
_ = Expr,
ok;
false ->
cast(Pid, Msg)
__X_Pid ->
cast(__X_Pid, Msg)
end
).

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -2,7 +2,7 @@
{erl_opts, [debug_info]}.
{deps, [
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.9.1"}}},
{wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.10.1"}}},
{kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.3"}}},
{brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.1"}}},
{brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.8"}}},

View File

@ -188,6 +188,7 @@ values(producer_values) ->
],
kafka_header_value_encode_mode => none,
max_inflight => 10,
partitions_limit => all_partitions,
buffer => #{
mode => <<"hybrid">>,
per_partition_limit => <<"2GB">>,
@ -414,6 +415,14 @@ fields(producer_kafka_opts) ->
desc => ?DESC(partition_count_refresh_interval)
}
)},
{partitions_limit,
mk(
hoconsc:union([all_partitions, pos_integer()]),
#{
default => <<"all_partitions">>,
desc => ?DESC(partitions_limit)
}
)},
{max_inflight,
mk(
pos_integer(),

View File

@ -135,6 +135,7 @@ create_producers_for_bridge_v2(
KafkaHeadersTokens = preproc_kafka_headers(maps:get(kafka_headers, KafkaConfig, undefined)),
KafkaExtHeadersTokens = preproc_ext_headers(maps:get(kafka_ext_headers, KafkaConfig, [])),
KafkaHeadersValEncodeMode = maps:get(kafka_header_value_encode_mode, KafkaConfig, none),
MaxPartitions = maps:get(partitions_limit, KafkaConfig, all_partitions),
#{name := BridgeName} = emqx_bridge_v2:parse_id(BridgeV2Id),
TestIdStart = string:find(BridgeV2Id, ?TEST_ID_PREFIX),
IsDryRun =
@ -144,7 +145,7 @@ create_producers_for_bridge_v2(
_ ->
string:equal(TestIdStart, InstId)
end,
ok = check_topic_and_leader_connections(ClientId, KafkaTopic),
ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
WolffProducerConfig = producers_config(
BridgeType, BridgeName, KafkaConfig, IsDryRun, BridgeV2Id
),
@ -166,7 +167,8 @@ create_producers_for_bridge_v2(
kafka_config => KafkaConfig,
headers_tokens => KafkaHeadersTokens,
ext_headers_tokens => KafkaExtHeadersTokens,
headers_val_encode_mode => KafkaHeadersValEncodeMode
headers_val_encode_mode => KafkaHeadersValEncodeMode,
partitions_limit => MaxPartitions
}};
{error, Reason2} ->
?SLOG(error, #{
@ -517,9 +519,9 @@ on_get_channel_status(
%% `?status_disconnected' will make resource manager try to restart the producers /
%% connector, thus potentially dropping data held in wolff producer's replayq. The
%% only exception is if the topic does not exist ("unhealthy target").
#{kafka_topic := KafkaTopic} = maps:get(ChannelId, Channels),
#{kafka_topic := KafkaTopic, partitions_limit := MaxPartitions} = maps:get(ChannelId, Channels),
try
ok = check_topic_and_leader_connections(ClientId, KafkaTopic),
ok = check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions),
?status_connected
catch
throw:{unhealthy_target, Msg} ->
@ -528,11 +530,11 @@ on_get_channel_status(
{?status_connecting, {K, E}}
end.
check_topic_and_leader_connections(ClientId, KafkaTopic) ->
check_topic_and_leader_connections(ClientId, KafkaTopic, MaxPartitions) ->
case wolff_client_sup:find_client(ClientId) of
{ok, Pid} ->
ok = check_topic_status(ClientId, Pid, KafkaTopic),
ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic);
ok = check_if_healthy_leaders(ClientId, Pid, KafkaTopic, MaxPartitions);
{error, no_such_client} ->
throw(#{
reason => cannot_find_kafka_client,
@ -562,9 +564,9 @@ check_client_connectivity(ClientId) ->
{error, {find_client, Reason}}
end.
check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid) ->
check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic, MaxPartitions) when is_pid(ClientPid) ->
Leaders =
case wolff_client:get_leader_connections(ClientPid, KafkaTopic) of
case wolff_client:get_leader_connections(ClientPid, KafkaTopic, MaxPartitions) of
{ok, LeadersToCheck} ->
%% Kafka is considered healthy as long as any of the partition leader is reachable.
lists:filtermap(
@ -584,7 +586,8 @@ check_if_healthy_leaders(ClientId, ClientPid, KafkaTopic) when is_pid(ClientPid)
throw(#{
error => no_connected_partition_leader,
kafka_client => ClientId,
kafka_topic => KafkaTopic
kafka_topic => KafkaTopic,
partitions_limit => MaxPartitions
});
_ ->
ok
@ -619,6 +622,7 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
required_acks := RequiredAcks,
partition_count_refresh_interval := PCntRefreshInterval,
max_inflight := MaxInflight,
partitions_limit := MaxPartitions,
buffer := #{
mode := BufferMode0,
per_partition_limit := PerPartitionLimit,
@ -652,7 +656,8 @@ producers_config(BridgeType, BridgeName, Input, IsDryRun, BridgeV2Id) ->
max_batch_bytes => MaxBatchBytes,
max_send_ahead => MaxInflight - 1,
compression => Compression,
telemetry_meta_data => #{bridge_id => BridgeV2Id}
telemetry_meta_data => #{bridge_id => BridgeV2Id},
max_partitions => MaxPartitions
}.
%% Wolff API is a batch API.

View File

@ -0,0 +1 @@
Made possible to limit the number of Kafka partitions to utilize for Kafka data integration.

View File

@ -201,7 +201,7 @@ defmodule EMQXUmbrella.MixProject do
[
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.13", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.9.1"},
{:wolff, github: "kafka4beam/wolff", tag: "1.10.1"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},
{:brod, github: "kafka4beam/brod", tag: "3.16.8"},

View File

@ -339,4 +339,12 @@ server_name_indication.desc:
server_name_indication.label:
"""SNI"""
partitions_limit.desc:
"""Limit the number of partitions to produce data for the given topic.
The special value `all_partitions` is to utilize all partitions for the topic.
Setting this to a value which is greater than the total number of partitions in has no effect."""
partitions_limit.label:
"""Max Partitions"""
}

View File

@ -277,6 +277,14 @@ This value is to specify the size of each on-disk buffer file."""
buffer_segment_bytes.label:
"""Segment File Bytes"""
partitions_limit.desc:
"""Limit the number of partitions to produce data for the given topic.
The special value `all_partitions` is to utilize all partitions for the topic.
Setting this to a value which is greater than the total number of partitions in has no effect."""
partitions_limit.label:
"""Max Partitions"""
max_inflight.desc:
"""Maximum number of batches allowed for Confluent producer (per-partition) to send before receiving acknowledgement from Confluent. Greater value typically means better throughput. However, there can be a risk of message reordering when this value is greater than 1."""

View File

@ -386,6 +386,14 @@ consumer_kafka_opts.desc:
consumer_kafka_opts.label:
"""Kafka Consumer"""
partitions_limit.desc:
"""Limit the number of partitions to produce data for the given topic.
The special value `all_partitions` is to utilize all partitions for the topic.
Setting this to a value which is greater than the total number of partitions in has no effect."""
partitions_limit.label:
"""Max Partitions"""
max_inflight.desc:
"""Maximum number of batches allowed for Kafka producer (per-partition) to send before receiving acknowledgement from Kafka. Greater value typically means better throughput. However, there can be a risk of message reordering when this value is greater than 1."""