feat(kafka consumer): allow custom group id

Fixes https://emqx.atlassian.net/browse/EMQX-12273
Fixes EMQX-12273

When consuming messages in Kafka in Alibaba Cloud, the group needs to be configured in
advance, and then the consumer can use the group to consume messages. Automatic group
creation is generally not allowed online.
This commit is contained in:
Thales Macedo Garitezi 2024-04-30 15:23:24 -03:00
parent 437e7968b1
commit 14ef0b1e51
6 changed files with 43 additions and 5 deletions

View File

@ -84,6 +84,14 @@ fields(source_parameters) ->
required => true, required => true,
desc => ?DESC(emqx_bridge_kafka, consumer_kafka_topic) desc => ?DESC(emqx_bridge_kafka, consumer_kafka_topic)
} }
)},
{group_id,
mk(
binary(),
#{
required => false,
desc => ?DESC(group_id)
}
)} )}
| Fields | Fields
]; ];

View File

@ -26,7 +26,7 @@
]). ]).
-ifdef(TEST). -ifdef(TEST).
-export([consumer_group_id/1]). -export([consumer_group_id/2]).
-endif. -endif.
-include_lib("emqx/include/logger.hrl"). -include_lib("emqx/include/logger.hrl").
@ -50,6 +50,7 @@
parameters := source_parameters() parameters := source_parameters()
}. }.
-type source_parameters() :: #{ -type source_parameters() :: #{
group_id => binary(),
key_encoding_mode := encoding_mode(), key_encoding_mode := encoding_mode(),
max_batch_bytes := emqx_schema:bytesize(), max_batch_bytes := emqx_schema:bytesize(),
max_rejoin_attempts := non_neg_integer(), max_rejoin_attempts := non_neg_integer(),
@ -431,7 +432,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
%% note: the group id should be the same for all nodes in the %% note: the group id should be the same for all nodes in the
%% cluster, so that the load gets distributed between all %% cluster, so that the load gets distributed between all
%% consumers and we don't repeat messages in the same cluster. %% consumers and we don't repeat messages in the same cluster.
GroupID = consumer_group_id(BridgeName), GroupID = consumer_group_id(Params0, BridgeName),
%% earliest or latest %% earliest or latest
BeginOffset = OffsetResetPolicy0, BeginOffset = OffsetResetPolicy0,
OffsetResetPolicy = OffsetResetPolicy =
@ -623,8 +624,10 @@ log_when_error(Fun, Log) ->
}) })
end. end.
-spec consumer_group_id(atom() | binary()) -> binary(). -spec consumer_group_id(#{group_id => binary(), any() => term()}, atom() | binary()) -> binary().
consumer_group_id(BridgeName0) -> consumer_group_id(#{group_id := GroupId}, _BridgeName) when is_binary(GroupId) ->
GroupId;
consumer_group_id(_ConsumerParams, BridgeName0) ->
BridgeName = to_bin(BridgeName0), BridgeName = to_bin(BridgeName0),
<<"emqx-kafka-consumer-", BridgeName/binary>>. <<"emqx-kafka-consumer-", BridgeName/binary>>.

View File

@ -1542,7 +1542,7 @@ t_receive_after_recovery(Config) ->
_Interval = 500, _Interval = 500,
_NAttempts = 20, _NAttempts = 20,
begin begin
GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(KafkaNameA), GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(#{}, KafkaNameA),
{ok, [#{partitions := Partitions}]} = brod:fetch_committed_offsets( {ok, [#{partitions := Partitions}]} = brod:fetch_committed_offsets(
KafkaClientId, GroupId KafkaClientId, GroupId
), ),

View File

@ -351,3 +351,24 @@ t_bad_bootstrap_host(Config) ->
) )
), ),
ok. ok.
t_custom_group_id(Config) ->
?check_trace(
begin
#{<<"bootstrap_hosts">> := BootstrapHosts} = ?config(connector_config, Config),
CustomGroupId = <<"my_group_id">>,
{ok, {{_, 201, _}, _, _}} =
emqx_bridge_v2_testlib:create_bridge_api(
Config,
#{<<"parameters">> => #{<<"group_id">> => CustomGroupId}}
),
[Endpoint] = emqx_bridge_kafka_impl:hosts(BootstrapHosts),
?assertMatch(
{ok, [{_, CustomGroupId, _}]},
brod:list_groups(Endpoint, _ConnOpts = #{})
),
ok
end,
[]
),
ok.

View File

@ -0,0 +1 @@
Added the option to customize group ids in advance for Kafka Consumer sources.

View File

@ -15,4 +15,9 @@ emqx_bridge_kafka_consumer_schema {
config_connector.label: config_connector.label:
"""Kafka Consumer Client Configuration""" """Kafka Consumer Client Configuration"""
group_id.desc:
"""Custom group identifier to be used for this source. If omitted, one based off the source name will be automatically generated."""
group_id.label:
"""Kafka Consumer Custom Group Id"""
} }