Merge pull request #12961 from thalesmg/kconsu-custom-group-id-m-20240430
feat(kafka consumer): allow custom group id
This commit is contained in:
commit
c71f73924b
|
@ -84,6 +84,17 @@ fields(source_parameters) ->
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC(emqx_bridge_kafka, consumer_kafka_topic)
|
desc => ?DESC(emqx_bridge_kafka, consumer_kafka_topic)
|
||||||
}
|
}
|
||||||
|
)},
|
||||||
|
{group_id,
|
||||||
|
mk(
|
||||||
|
binary(),
|
||||||
|
#{
|
||||||
|
required => false,
|
||||||
|
validator => [
|
||||||
|
emqx_resource_validator:not_empty("Group id must not be empty")
|
||||||
|
],
|
||||||
|
desc => ?DESC(group_id)
|
||||||
|
}
|
||||||
)}
|
)}
|
||||||
| Fields
|
| Fields
|
||||||
];
|
];
|
||||||
|
|
|
@ -26,7 +26,7 @@
|
||||||
]).
|
]).
|
||||||
|
|
||||||
-ifdef(TEST).
|
-ifdef(TEST).
|
||||||
-export([consumer_group_id/1]).
|
-export([consumer_group_id/2]).
|
||||||
-endif.
|
-endif.
|
||||||
|
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
@ -50,6 +50,7 @@
|
||||||
parameters := source_parameters()
|
parameters := source_parameters()
|
||||||
}.
|
}.
|
||||||
-type source_parameters() :: #{
|
-type source_parameters() :: #{
|
||||||
|
group_id => binary(),
|
||||||
key_encoding_mode := encoding_mode(),
|
key_encoding_mode := encoding_mode(),
|
||||||
max_batch_bytes := emqx_schema:bytesize(),
|
max_batch_bytes := emqx_schema:bytesize(),
|
||||||
max_rejoin_attempts := non_neg_integer(),
|
max_rejoin_attempts := non_neg_integer(),
|
||||||
|
@ -431,7 +432,7 @@ start_consumer(Config, ConnectorResId, SourceResId, ClientID) ->
|
||||||
%% note: the group id should be the same for all nodes in the
|
%% note: the group id should be the same for all nodes in the
|
||||||
%% cluster, so that the load gets distributed between all
|
%% cluster, so that the load gets distributed between all
|
||||||
%% consumers and we don't repeat messages in the same cluster.
|
%% consumers and we don't repeat messages in the same cluster.
|
||||||
GroupID = consumer_group_id(BridgeName),
|
GroupID = consumer_group_id(Params0, BridgeName),
|
||||||
%% earliest or latest
|
%% earliest or latest
|
||||||
BeginOffset = OffsetResetPolicy0,
|
BeginOffset = OffsetResetPolicy0,
|
||||||
OffsetResetPolicy =
|
OffsetResetPolicy =
|
||||||
|
@ -623,8 +624,10 @@ log_when_error(Fun, Log) ->
|
||||||
})
|
})
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec consumer_group_id(atom() | binary()) -> binary().
|
-spec consumer_group_id(#{group_id => binary(), any() => term()}, atom() | binary()) -> binary().
|
||||||
consumer_group_id(BridgeName0) ->
|
consumer_group_id(#{group_id := GroupId}, _BridgeName) when is_binary(GroupId) ->
|
||||||
|
GroupId;
|
||||||
|
consumer_group_id(_ConsumerParams, BridgeName0) ->
|
||||||
BridgeName = to_bin(BridgeName0),
|
BridgeName = to_bin(BridgeName0),
|
||||||
<<"emqx-kafka-consumer-", BridgeName/binary>>.
|
<<"emqx-kafka-consumer-", BridgeName/binary>>.
|
||||||
|
|
||||||
|
|
|
@ -1542,7 +1542,7 @@ t_receive_after_recovery(Config) ->
|
||||||
_Interval = 500,
|
_Interval = 500,
|
||||||
_NAttempts = 20,
|
_NAttempts = 20,
|
||||||
begin
|
begin
|
||||||
GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(KafkaNameA),
|
GroupId = emqx_bridge_kafka_impl_consumer:consumer_group_id(#{}, KafkaNameA),
|
||||||
{ok, [#{partitions := Partitions}]} = brod:fetch_committed_offsets(
|
{ok, [#{partitions := Partitions}]} = brod:fetch_committed_offsets(
|
||||||
KafkaClientId, GroupId
|
KafkaClientId, GroupId
|
||||||
),
|
),
|
||||||
|
|
|
@ -204,6 +204,41 @@ test_keepalive_validation(Name, Conf) ->
|
||||||
[?_assertThrow(_, check(C)) || C <- InvalidConfs] ++
|
[?_assertThrow(_, check(C)) || C <- InvalidConfs] ++
|
||||||
[?_assertThrow(_, check_atom_key(C)) || C <- InvalidConfs].
|
[?_assertThrow(_, check_atom_key(C)) || C <- InvalidConfs].
|
||||||
|
|
||||||
|
%% assert compatibility
|
||||||
|
bridge_schema_json_test() ->
|
||||||
|
JSON = iolist_to_binary(emqx_dashboard_schema_api:bridge_schema_json()),
|
||||||
|
Map = emqx_utils_json:decode(JSON),
|
||||||
|
Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>],
|
||||||
|
?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)).
|
||||||
|
|
||||||
|
custom_group_id_test() ->
|
||||||
|
BaseConfig = kafka_consumer_source_config(),
|
||||||
|
BadSourceConfig = emqx_utils_maps:deep_merge(
|
||||||
|
BaseConfig,
|
||||||
|
#{<<"parameters">> => #{<<"group_id">> => <<>>}}
|
||||||
|
),
|
||||||
|
?assertThrow(
|
||||||
|
{_, [
|
||||||
|
#{
|
||||||
|
path := "sources.kafka_consumer.my_consumer.parameters.group_id",
|
||||||
|
reason := "Group id must not be empty"
|
||||||
|
}
|
||||||
|
]},
|
||||||
|
emqx_bridge_v2_testlib:parse_and_check(source, kafka_consumer, my_consumer, BadSourceConfig)
|
||||||
|
),
|
||||||
|
|
||||||
|
CustomId = <<"custom_id">>,
|
||||||
|
OkSourceConfig = emqx_utils_maps:deep_merge(
|
||||||
|
BaseConfig,
|
||||||
|
#{<<"parameters">> => #{<<"group_id">> => CustomId}}
|
||||||
|
),
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"parameters">> := #{<<"group_id">> := CustomId}},
|
||||||
|
emqx_bridge_v2_testlib:parse_and_check(source, kafka_consumer, my_consumer, OkSourceConfig)
|
||||||
|
),
|
||||||
|
|
||||||
|
ok.
|
||||||
|
|
||||||
%%===========================================================================
|
%%===========================================================================
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%===========================================================================
|
%%===========================================================================
|
||||||
|
@ -355,9 +390,21 @@ kafka_consumer_hocon() ->
|
||||||
"\n }"
|
"\n }"
|
||||||
"\n }".
|
"\n }".
|
||||||
|
|
||||||
%% assert compatibility
|
kafka_consumer_source_config() ->
|
||||||
bridge_schema_json_test() ->
|
#{
|
||||||
JSON = iolist_to_binary(emqx_dashboard_schema_api:bridge_schema_json()),
|
<<"enable">> => true,
|
||||||
Map = emqx_utils_json:decode(JSON),
|
<<"connector">> => <<"my_connector">>,
|
||||||
Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>],
|
<<"parameters">> =>
|
||||||
?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)).
|
#{
|
||||||
|
<<"key_encoding_mode">> => <<"none">>,
|
||||||
|
<<"max_batch_bytes">> => <<"896KB">>,
|
||||||
|
<<"max_rejoin_attempts">> => <<"5">>,
|
||||||
|
<<"offset_reset_policy">> => <<"latest">>,
|
||||||
|
<<"topic">> => <<"please override">>,
|
||||||
|
<<"value_encoding_mode">> => <<"none">>
|
||||||
|
},
|
||||||
|
<<"resource_opts">> => #{
|
||||||
|
<<"health_check_interval">> => <<"2s">>,
|
||||||
|
<<"resume_interval">> => <<"2s">>
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
|
|
@ -351,3 +351,28 @@ t_bad_bootstrap_host(Config) ->
|
||||||
)
|
)
|
||||||
),
|
),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
t_custom_group_id(Config) ->
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
#{<<"bootstrap_hosts">> := BootstrapHosts} = ?config(connector_config, Config),
|
||||||
|
CustomGroupId = <<"my_group_id">>,
|
||||||
|
{ok, {{_, 201, _}, _, _}} =
|
||||||
|
emqx_bridge_v2_testlib:create_bridge_api(
|
||||||
|
Config,
|
||||||
|
#{<<"parameters">> => #{<<"group_id">> => CustomGroupId}}
|
||||||
|
),
|
||||||
|
[Endpoint] = emqx_bridge_kafka_impl:hosts(BootstrapHosts),
|
||||||
|
?retry(100, 10, begin
|
||||||
|
{ok, Groups} = brod:list_groups(Endpoint, _ConnOpts = #{}),
|
||||||
|
?assertMatch(
|
||||||
|
[_],
|
||||||
|
[Group || Group = {_, Id, _} <- Groups, Id == CustomGroupId],
|
||||||
|
#{groups => Groups}
|
||||||
|
)
|
||||||
|
end),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
[]
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Added the option to customize group ids in advance for Kafka Consumer sources.
|
|
@ -15,4 +15,9 @@ emqx_bridge_kafka_consumer_schema {
|
||||||
config_connector.label:
|
config_connector.label:
|
||||||
"""Kafka Consumer Client Configuration"""
|
"""Kafka Consumer Client Configuration"""
|
||||||
|
|
||||||
|
group_id.desc:
|
||||||
|
"""Consumer group identifier to be used for this source. If omitted, one based off the source name will be automatically generated."""
|
||||||
|
group_id.label:
|
||||||
|
"""Custom Consumer Group Id"""
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue