From 14ef0b1e5189453944829eaafc59dfb1b8133dcd Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 30 Apr 2024 15:23:24 -0300 Subject: [PATCH] feat(kafka consumer): allow custom group id Fixes https://emqx.atlassian.net/browse/EMQX-12273 Fixes EMQX-12273 When consuming messages in Kafka in Alibaba Cloud, the group needs to be configured in advance, and then the consumer can use the group to consume messages. Automatic group creation is generally not allowed online. --- .../src/emqx_bridge_kafka_consumer_schema.erl | 8 +++++++ .../src/emqx_bridge_kafka_impl_consumer.erl | 11 ++++++---- .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 2 +- .../emqx_bridge_v2_kafka_consumer_SUITE.erl | 21 +++++++++++++++++++ changes/ee/feat-12961.en.md | 1 + .../emqx_bridge_kafka_consumer_schema.hocon | 5 +++++ 6 files changed, 43 insertions(+), 5 deletions(-) create mode 100644 changes/ee/feat-12961.en.md diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl index d4e76dfc8..8fe0d6351 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl @@ -84,6 +84,14 @@ fields(source_parameters) -> required => true, desc => ?DESC(emqx_bridge_kafka, consumer_kafka_topic) } + )}, + {group_id, + mk( + binary(), + #{ + required => false, + desc => ?DESC(group_id) + } )} | Fields ]; diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index c4f66dfff..2f0aff68d 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -26,7 +26,7 @@ ]). -ifdef(TEST). --export([consumer_group_id/1]). +-export([consumer_group_id/2]). -endif. -include_lib("emqx/include/logger.hrl"). @@ -50,6 +50,7 @@ parameters := source_parameters() }. -type source_parameters() :: #{ + group_id => binary(), key_encoding_mode := encoding_mode(), max_batch_bytes := emqx_schema:bytesize(), max_rejoin_attempts := non_neg_integer(), @@ -431,7 +432,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) -> %% note: the group id should be the same for all nodes in the %% cluster, so that the load gets distributed between all %% consumers and we don't repeat messages in the same cluster. - GroupID = consumer_group_id(BridgeName), + GroupID = consumer_group_id(Params0, BridgeName), %% earliest or latest BeginOffset = OffsetResetPolicy0, OffsetResetPolicy = @@ -623,8 +624,10 @@ log_when_error(Fun, Log) -> }) end. --spec consumer_group_id(atom() | binary()) -> binary(). -consumer_group_id(BridgeName0) -> +-spec consumer_group_id(#{group_id => binary(), any() => term()}, atom() | binary()) -> binary(). +consumer_group_id(#{group_id := GroupId}, _BridgeName) when is_binary(GroupId) -> + GroupId; +consumer_group_id(_ConsumerParams, BridgeName0) -> BridgeName = to_bin(BridgeName0), <<"emqx-kafka-consumer-", BridgeName/binary>>. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 56aabb1c3..24405eb9e 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -1542,7 +1542,7 @@ t_receive_after_recovery(Config) -> _Interval = 500, _NAttempts = 20, begin - GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(KafkaNameA), + GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(#{}, KafkaNameA), {ok, [#{partitions := Partitions}]} = brod:fetch_committed_offsets( KafkaClientId, GroupId ), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl index 8568e2f62..2f66b9c0e 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl @@ -351,3 +351,24 @@ t_bad_bootstrap_host(Config) -> ) ), ok. + +t_custom_group_id(Config) -> + ?check_trace( + begin + #{<<"bootstrap_hosts">> := BootstrapHosts} = ?config(connector_config, Config), + CustomGroupId = <<"my_group_id">>, + {ok, {{_, 201, _}, _, _}} = + emqx_bridge_v2_testlib:create_bridge_api( + Config, + #{<<"parameters">> => #{<<"group_id">> => CustomGroupId}} + ), + [Endpoint] = emqx_bridge_kafka_impl:hosts(BootstrapHosts), + ?assertMatch( + {ok, [{_, CustomGroupId, _}]}, + brod:list_groups(Endpoint, _ConnOpts = #{}) + ), + ok + end, + [] + ), + ok. diff --git a/changes/ee/feat-12961.en.md b/changes/ee/feat-12961.en.md new file mode 100644 index 000000000..f70338751 --- /dev/null +++ b/changes/ee/feat-12961.en.md @@ -0,0 +1 @@ +Added the option to customize group ids in advance for Kafka Consumer sources. diff --git a/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon b/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon index 6f29d7cc8..cf6e18a68 100644 --- a/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon +++ b/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon @@ -15,4 +15,9 @@ emqx_bridge_kafka_consumer_schema { config_connector.label: """Kafka Consumer Client Configuration""" + group_id.desc: + """Custom group identifier to be used for this source. If omitted, one based off the source name will be automatically generated.""" + group_id.label: + """Kafka Consumer Custom Group Id""" + }