diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl index d4e76dfc8..a4bb48aec 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl @@ -84,6 +84,17 @@ fields(source_parameters) -> required => true, desc => ?DESC(emqx_bridge_kafka, consumer_kafka_topic) } + )}, + {group_id, + mk( + binary(), + #{ + required => false, + validator => [ + emqx_resource_validator:not_empty("Group id must not be empty") + ], + desc => ?DESC(group_id) + } )} | Fields ]; diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index c4f66dfff..2f0aff68d 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -26,7 +26,7 @@ ]). -ifdef(TEST). --export([consumer_group_id/1]). +-export([consumer_group_id/2]). -endif. -include_lib("emqx/include/logger.hrl"). @@ -50,6 +50,7 @@ parameters := source_parameters() }. -type source_parameters() :: #{ + group_id => binary(), key_encoding_mode := encoding_mode(), max_batch_bytes := emqx_schema:bytesize(), max_rejoin_attempts := non_neg_integer(), @@ -431,7 +432,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) -> %% note: the group id should be the same for all nodes in the %% cluster, so that the load gets distributed between all %% consumers and we don't repeat messages in the same cluster. - GroupID = consumer_group_id(BridgeName), + GroupID = consumer_group_id(Params0, BridgeName), %% earliest or latest BeginOffset = OffsetResetPolicy0, OffsetResetPolicy = @@ -623,8 +624,10 @@ log_when_error(Fun, Log) -> }) end. --spec consumer_group_id(atom() | binary()) -> binary(). -consumer_group_id(BridgeName0) -> +-spec consumer_group_id(#{group_id => binary(), any() => term()}, atom() | binary()) -> binary(). +consumer_group_id(#{group_id := GroupId}, _BridgeName) when is_binary(GroupId) -> + GroupId; +consumer_group_id(_ConsumerParams, BridgeName0) -> BridgeName = to_bin(BridgeName0), <<"emqx-kafka-consumer-", BridgeName/binary>>. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 56aabb1c3..24405eb9e 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -1542,7 +1542,7 @@ t_receive_after_recovery(Config) -> _Interval = 500, _NAttempts = 20, begin - GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(KafkaNameA), + GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(#{}, KafkaNameA), {ok, [#{partitions := Partitions}]} = brod:fetch_committed_offsets( KafkaClientId, GroupId ), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 54f6f9efc..4de01c98d 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -204,6 +204,41 @@ test_keepalive_validation(Name, Conf) -> [?_assertThrow(_, check(C)) || C <- InvalidConfs] ++ [?_assertThrow(_, check_atom_key(C)) || C <- InvalidConfs]. +%% assert compatibility +bridge_schema_json_test() -> + JSON = iolist_to_binary(emqx_dashboard_schema_api:bridge_schema_json()), + Map = emqx_utils_json:decode(JSON), + Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>], + ?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)). + +custom_group_id_test() -> + BaseConfig = kafka_consumer_source_config(), + BadSourceConfig = emqx_utils_maps:deep_merge( + BaseConfig, + #{<<"parameters">> => #{<<"group_id">> => <<>>}} + ), + ?assertThrow( + {_, [ + #{ + path := "sources.kafka_consumer.my_consumer.parameters.group_id", + reason := "Group id must not be empty" + } + ]}, + emqx_bridge_v2_testlib:parse_and_check(source, kafka_consumer, my_consumer, BadSourceConfig) + ), + + CustomId = <<"custom_id">>, + OkSourceConfig = emqx_utils_maps:deep_merge( + BaseConfig, + #{<<"parameters">> => #{<<"group_id">> => CustomId}} + ), + ?assertMatch( + #{<<"parameters">> := #{<<"group_id">> := CustomId}}, + emqx_bridge_v2_testlib:parse_and_check(source, kafka_consumer, my_consumer, OkSourceConfig) + ), + + ok. + %%=========================================================================== %% Helper functions %%=========================================================================== @@ -355,9 +390,21 @@ kafka_consumer_hocon() -> "\n }" "\n }". -%% assert compatibility -bridge_schema_json_test() -> - JSON = iolist_to_binary(emqx_dashboard_schema_api:bridge_schema_json()), - Map = emqx_utils_json:decode(JSON), - Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>], - ?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)). +kafka_consumer_source_config() -> + #{ + <<"enable">> => true, + <<"connector">> => <<"my_connector">>, + <<"parameters">> => + #{ + <<"key_encoding_mode">> => <<"none">>, + <<"max_batch_bytes">> => <<"896KB">>, + <<"max_rejoin_attempts">> => <<"5">>, + <<"offset_reset_policy">> => <<"latest">>, + <<"topic">> => <<"please override">>, + <<"value_encoding_mode">> => <<"none">> + }, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"2s">>, + <<"resume_interval">> => <<"2s">> + } + }. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl index 8568e2f62..1372bf71c 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl @@ -351,3 +351,28 @@ t_bad_bootstrap_host(Config) -> ) ), ok. + +t_custom_group_id(Config) -> + ?check_trace( + begin + #{<<"bootstrap_hosts">> := BootstrapHosts} = ?config(connector_config, Config), + CustomGroupId = <<"my_group_id">>, + {ok, {{_, 201, _}, _, _}} = + emqx_bridge_v2_testlib:create_bridge_api( + Config, + #{<<"parameters">> => #{<<"group_id">> => CustomGroupId}} + ), + [Endpoint] = emqx_bridge_kafka_impl:hosts(BootstrapHosts), + ?retry(100, 10, begin + {ok, Groups} = brod:list_groups(Endpoint, _ConnOpts = #{}), + ?assertMatch( + [_], + [Group || Group = {_, Id, _} <- Groups, Id == CustomGroupId], + #{groups => Groups} + ) + end), + ok + end, + [] + ), + ok. diff --git a/changes/ee/feat-12961.en.md b/changes/ee/feat-12961.en.md new file mode 100644 index 000000000..f70338751 --- /dev/null +++ b/changes/ee/feat-12961.en.md @@ -0,0 +1 @@ +Added the option to customize group ids in advance for Kafka Consumer sources. diff --git a/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon b/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon index 6f29d7cc8..3ac6a4e2b 100644 --- a/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon +++ b/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon @@ -15,4 +15,9 @@ emqx_bridge_kafka_consumer_schema { config_connector.label: """Kafka Consumer Client Configuration""" + group_id.desc: + """Consumer group identifier to be used for this source. If omitted, one based off the source name will be automatically generated.""" + group_id.label: + """Custom Consumer Group Id""" + }