diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl index d4e76dfc8..8fe0d6351 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl @@ -84,6 +84,14 @@ fields(source_parameters) -> required => true, desc => ?DESC(emqx_bridge_kafka, consumer_kafka_topic) } + )}, + {group_id, + mk( + binary(), + #{ + required => false, + desc => ?DESC(group_id) + } )} | Fields ]; diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index c4f66dfff..2f0aff68d 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -26,7 +26,7 @@ ]). -ifdef(TEST). --export([consumer_group_id/1]). +-export([consumer_group_id/2]). -endif. -include_lib("emqx/include/logger.hrl"). @@ -50,6 +50,7 @@ parameters := source_parameters() }. -type source_parameters() :: #{ + group_id => binary(), key_encoding_mode := encoding_mode(), max_batch_bytes := emqx_schema:bytesize(), max_rejoin_attempts := non_neg_integer(), @@ -431,7 +432,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) -> %% note: the group id should be the same for all nodes in the %% cluster, so that the load gets distributed between all %% consumers and we don't repeat messages in the same cluster. - GroupID = consumer_group_id(BridgeName), + GroupID = consumer_group_id(Params0, BridgeName), %% earliest or latest BeginOffset = OffsetResetPolicy0, OffsetResetPolicy = @@ -623,8 +624,10 @@ log_when_error(Fun, Log) -> }) end. --spec consumer_group_id(atom() | binary()) -> binary(). -consumer_group_id(BridgeName0) -> +-spec consumer_group_id(#{group_id => binary(), any() => term()}, atom() | binary()) -> binary(). +consumer_group_id(#{group_id := GroupId}, _BridgeName) when is_binary(GroupId) -> + GroupId; +consumer_group_id(_ConsumerParams, BridgeName0) -> BridgeName = to_bin(BridgeName0), <<"emqx-kafka-consumer-", BridgeName/binary>>. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 56aabb1c3..24405eb9e 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -1542,7 +1542,7 @@ t_receive_after_recovery(Config) -> _Interval = 500, _NAttempts = 20, begin - GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(KafkaNameA), + GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(#{}, KafkaNameA), {ok, [#{partitions := Partitions}]} = brod:fetch_committed_offsets( KafkaClientId, GroupId ), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl index 8568e2f62..2f66b9c0e 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl @@ -351,3 +351,24 @@ t_bad_bootstrap_host(Config) -> ) ), ok. + +t_custom_group_id(Config) -> + ?check_trace( + begin + #{<<"bootstrap_hosts">> := BootstrapHosts} = ?config(connector_config, Config), + CustomGroupId = <<"my_group_id">>, + {ok, {{_, 201, _}, _, _}} = + emqx_bridge_v2_testlib:create_bridge_api( + Config, + #{<<"parameters">> => #{<<"group_id">> => CustomGroupId}} + ), + [Endpoint] = emqx_bridge_kafka_impl:hosts(BootstrapHosts), + ?assertMatch( + {ok, [{_, CustomGroupId, _}]}, + brod:list_groups(Endpoint, _ConnOpts = #{}) + ), + ok + end, + [] + ), + ok. diff --git a/changes/ee/feat-12961.en.md b/changes/ee/feat-12961.en.md new file mode 100644 index 000000000..f70338751 --- /dev/null +++ b/changes/ee/feat-12961.en.md @@ -0,0 +1 @@ +Added the option to customize group ids in advance for Kafka Consumer sources. diff --git a/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon b/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon index 6f29d7cc8..cf6e18a68 100644 --- a/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon +++ b/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon @@ -15,4 +15,9 @@ emqx_bridge_kafka_consumer_schema { config_connector.label: """Kafka Consumer Client Configuration""" + group_id.desc: + """Custom group identifier to be used for this source. If omitted, one based off the source name will be automatically generated.""" + group_id.label: + """Kafka Consumer Custom Group Id""" + }