From 14ef0b1e5189453944829eaafc59dfb1b8133dcd Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 30 Apr 2024 15:23:24 -0300 Subject: [PATCH 1/4] feat(kafka consumer): allow custom group id Fixes https://emqx.atlassian.net/browse/EMQX-12273 Fixes EMQX-12273 When consuming messages in Kafka in Alibaba Cloud, the group needs to be configured in advance, and then the consumer can use the group to consume messages. Automatic group creation is generally not allowed online. --- .../src/emqx_bridge_kafka_consumer_schema.erl | 8 +++++++ .../src/emqx_bridge_kafka_impl_consumer.erl | 11 ++++++---- .../emqx_bridge_kafka_impl_consumer_SUITE.erl | 2 +- .../emqx_bridge_v2_kafka_consumer_SUITE.erl | 21 +++++++++++++++++++ changes/ee/feat-12961.en.md | 1 + .../emqx_bridge_kafka_consumer_schema.hocon | 5 +++++ 6 files changed, 43 insertions(+), 5 deletions(-) create mode 100644 changes/ee/feat-12961.en.md diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl index d4e76dfc8..8fe0d6351 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl @@ -84,6 +84,14 @@ fields(source_parameters) -> required => true, desc => ?DESC(emqx_bridge_kafka, consumer_kafka_topic) } + )}, + {group_id, + mk( + binary(), + #{ + required => false, + desc => ?DESC(group_id) + } )} | Fields ]; diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl index c4f66dfff..2f0aff68d 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_impl_consumer.erl @@ -26,7 +26,7 @@ ]). -ifdef(TEST). --export([consumer_group_id/1]). +-export([consumer_group_id/2]). -endif. -include_lib("emqx/include/logger.hrl"). @@ -50,6 +50,7 @@ parameters := source_parameters() }. -type source_parameters() :: #{ + group_id => binary(), key_encoding_mode := encoding_mode(), max_batch_bytes := emqx_schema:bytesize(), max_rejoin_attempts := non_neg_integer(), @@ -431,7 +432,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) -> %% note: the group id should be the same for all nodes in the %% cluster, so that the load gets distributed between all %% consumers and we don't repeat messages in the same cluster. - GroupID = consumer_group_id(BridgeName), + GroupID = consumer_group_id(Params0, BridgeName), %% earliest or latest BeginOffset = OffsetResetPolicy0, OffsetResetPolicy = @@ -623,8 +624,10 @@ log_when_error(Fun, Log) -> }) end. --spec consumer_group_id(atom() | binary()) -> binary(). -consumer_group_id(BridgeName0) -> +-spec consumer_group_id(#{group_id => binary(), any() => term()}, atom() | binary()) -> binary(). +consumer_group_id(#{group_id := GroupId}, _BridgeName) when is_binary(GroupId) -> + GroupId; +consumer_group_id(_ConsumerParams, BridgeName0) -> BridgeName = to_bin(BridgeName0), <<"emqx-kafka-consumer-", BridgeName/binary>>. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl index 56aabb1c3..24405eb9e 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_consumer_SUITE.erl @@ -1542,7 +1542,7 @@ t_receive_after_recovery(Config) -> _Interval = 500, _NAttempts = 20, begin - GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(KafkaNameA), + GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(#{}, KafkaNameA), {ok, [#{partitions := Partitions}]} = brod:fetch_committed_offsets( KafkaClientId, GroupId ), diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl index 8568e2f62..2f66b9c0e 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl @@ -351,3 +351,24 @@ t_bad_bootstrap_host(Config) -> ) ), ok. + +t_custom_group_id(Config) -> + ?check_trace( + begin + #{<<"bootstrap_hosts">> := BootstrapHosts} = ?config(connector_config, Config), + CustomGroupId = <<"my_group_id">>, + {ok, {{_, 201, _}, _, _}} = + emqx_bridge_v2_testlib:create_bridge_api( + Config, + #{<<"parameters">> => #{<<"group_id">> => CustomGroupId}} + ), + [Endpoint] = emqx_bridge_kafka_impl:hosts(BootstrapHosts), + ?assertMatch( + {ok, [{_, CustomGroupId, _}]}, + brod:list_groups(Endpoint, _ConnOpts = #{}) + ), + ok + end, + [] + ), + ok. diff --git a/changes/ee/feat-12961.en.md b/changes/ee/feat-12961.en.md new file mode 100644 index 000000000..f70338751 --- /dev/null +++ b/changes/ee/feat-12961.en.md @@ -0,0 +1 @@ +Added the option to customize group ids in advance for Kafka Consumer sources. diff --git a/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon b/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon index 6f29d7cc8..cf6e18a68 100644 --- a/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon +++ b/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon @@ -15,4 +15,9 @@ emqx_bridge_kafka_consumer_schema { config_connector.label: """Kafka Consumer Client Configuration""" + group_id.desc: + """Custom group identifier to be used for this source. If omitted, one based off the source name will be automatically generated.""" + group_id.label: + """Kafka Consumer Custom Group Id""" + } From 3942b371d7a3e6678a49f924bfe03c3c3367a9a2 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Tue, 30 Apr 2024 15:23:24 -0300 Subject: [PATCH 2/4] feat(kafka consumer): allow custom group id Fixes https://emqx.atlassian.net/browse/EMQX-12273 Fixes EMQX-12273 When consuming messages in Kafka in Alibaba Cloud, the group needs to be configured in advance, and then the consumer can use the group to consume messages. Automatic group creation is generally not allowed online. --- .../test/emqx_bridge_v2_kafka_consumer_SUITE.erl | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl index 2f66b9c0e..1372bf71c 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_consumer_SUITE.erl @@ -363,10 +363,14 @@ t_custom_group_id(Config) -> #{<<"parameters">> => #{<<"group_id">> => CustomGroupId}} ), [Endpoint] = emqx_bridge_kafka_impl:hosts(BootstrapHosts), - ?assertMatch( - {ok, [{_, CustomGroupId, _}]}, - brod:list_groups(Endpoint, _ConnOpts = #{}) - ), + ?retry(100, 10, begin + {ok, Groups} = brod:list_groups(Endpoint, _ConnOpts = #{}), + ?assertMatch( + [_], + [Group || Group = {_, Id, _} <- Groups, Id == CustomGroupId], + #{groups => Groups} + ) + end), ok end, [] From eb113fa578524e06872c669b2c9be9148988bb84 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 2 May 2024 11:19:00 -0300 Subject: [PATCH 3/4] fix: add non-empty validator --- .../src/emqx_bridge_kafka_consumer_schema.erl | 3 + .../test/emqx_bridge_kafka_tests.erl | 59 +++++++++++++++++-- 2 files changed, 56 insertions(+), 6 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl index 8fe0d6351..a4bb48aec 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka_consumer_schema.erl @@ -90,6 +90,9 @@ fields(source_parameters) -> binary(), #{ required => false, + validator => [ + emqx_resource_validator:not_empty("Group id must not be empty") + ], desc => ?DESC(group_id) } )} diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 54f6f9efc..4de01c98d 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -204,6 +204,41 @@ test_keepalive_validation(Name, Conf) -> [?_assertThrow(_, check(C)) || C <- InvalidConfs] ++ [?_assertThrow(_, check_atom_key(C)) || C <- InvalidConfs]. +%% assert compatibility +bridge_schema_json_test() -> + JSON = iolist_to_binary(emqx_dashboard_schema_api:bridge_schema_json()), + Map = emqx_utils_json:decode(JSON), + Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>], + ?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)). + +custom_group_id_test() -> + BaseConfig = kafka_consumer_source_config(), + BadSourceConfig = emqx_utils_maps:deep_merge( + BaseConfig, + #{<<"parameters">> => #{<<"group_id">> => <<>>}} + ), + ?assertThrow( + {_, [ + #{ + path := "sources.kafka_consumer.my_consumer.parameters.group_id", + reason := "Group id must not be empty" + } + ]}, + emqx_bridge_v2_testlib:parse_and_check(source, kafka_consumer, my_consumer, BadSourceConfig) + ), + + CustomId = <<"custom_id">>, + OkSourceConfig = emqx_utils_maps:deep_merge( + BaseConfig, + #{<<"parameters">> => #{<<"group_id">> => CustomId}} + ), + ?assertMatch( + #{<<"parameters">> := #{<<"group_id">> := CustomId}}, + emqx_bridge_v2_testlib:parse_and_check(source, kafka_consumer, my_consumer, OkSourceConfig) + ), + + ok. + %%=========================================================================== %% Helper functions %%=========================================================================== @@ -355,9 +390,21 @@ kafka_consumer_hocon() -> "\n }" "\n }". -%% assert compatibility -bridge_schema_json_test() -> - JSON = iolist_to_binary(emqx_dashboard_schema_api:bridge_schema_json()), - Map = emqx_utils_json:decode(JSON), - Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>], - ?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)). +kafka_consumer_source_config() -> + #{ + <<"enable">> => true, + <<"connector">> => <<"my_connector">>, + <<"parameters">> => + #{ + <<"key_encoding_mode">> => <<"none">>, + <<"max_batch_bytes">> => <<"896KB">>, + <<"max_rejoin_attempts">> => <<"5">>, + <<"offset_reset_policy">> => <<"latest">>, + <<"topic">> => <<"please override">>, + <<"value_encoding_mode">> => <<"none">> + }, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"2s">>, + <<"resume_interval">> => <<"2s">> + } + }. From 905d04f1c3bf0148a8dfdc136d514bc9294f0924 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 2 May 2024 11:19:14 -0300 Subject: [PATCH 4/4] docs: improve descriptions --- rel/i18n/emqx_bridge_kafka_consumer_schema.hocon | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon b/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon index cf6e18a68..3ac6a4e2b 100644 --- a/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon +++ b/rel/i18n/emqx_bridge_kafka_consumer_schema.hocon @@ -16,8 +16,8 @@ emqx_bridge_kafka_consumer_schema { """Kafka Consumer Client Configuration""" group_id.desc: - """Custom group identifier to be used for this source. If omitted, one based off the source name will be automatically generated.""" + """Consumer group identifier to be used for this source. If omitted, one based off the source name will be automatically generated.""" group_id.label: - """Kafka Consumer Custom Group Id""" + """Custom Consumer Group Id""" }