feat(kafka_producer): add validation for empty message key when strategy = key_dispatch
Fixes https://emqx.atlassian.net/browse/EMQX-9979
This commit is contained in:
parent
fe81e9521a
commit
9b7e473cf6
|
@ -1,6 +1,6 @@
|
||||||
{application, emqx_bridge_kafka, [
|
{application, emqx_bridge_kafka, [
|
||||||
{description, "EMQX Enterprise Kafka Bridge"},
|
{description, "EMQX Enterprise Kafka Bridge"},
|
||||||
{vsn, "0.1.2"},
|
{vsn, "0.1.3"},
|
||||||
{registered, [emqx_bridge_kafka_consumer_sup]},
|
{registered, [emqx_bridge_kafka_consumer_sup]},
|
||||||
{applications, [
|
{applications, [
|
||||||
kernel,
|
kernel,
|
||||||
|
|
|
@ -247,7 +247,8 @@ fields(producer_opts) ->
|
||||||
{kafka,
|
{kafka,
|
||||||
mk(ref(producer_kafka_opts), #{
|
mk(ref(producer_kafka_opts), #{
|
||||||
required => true,
|
required => true,
|
||||||
desc => ?DESC(producer_kafka_opts)
|
desc => ?DESC(producer_kafka_opts),
|
||||||
|
validator => fun producer_strategy_key_validator/1
|
||||||
})}
|
})}
|
||||||
];
|
];
|
||||||
fields(producer_kafka_opts) ->
|
fields(producer_kafka_opts) ->
|
||||||
|
@ -459,3 +460,11 @@ consumer_topic_mapping_validator(TopicMapping = [_ | _]) ->
|
||||||
false ->
|
false ->
|
||||||
{error, "Kafka topics must not be repeated in a bridge"}
|
{error, "Kafka topics must not be repeated in a bridge"}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
producer_strategy_key_validator(#{
|
||||||
|
<<"partition_strategy">> := key_dispatch,
|
||||||
|
<<"message">> := #{<<"key">> := ""}
|
||||||
|
}) ->
|
||||||
|
{error, "Message key cannot be empty when `key_dispatch` strategy is used"};
|
||||||
|
producer_strategy_key_validator(_) ->
|
||||||
|
ok.
|
||||||
|
|
|
@ -138,6 +138,36 @@ kafka_consumer_test() ->
|
||||||
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
|
message_key_dispatch_validations_test() ->
|
||||||
|
Conf0 = kafka_producer_new_hocon(),
|
||||||
|
Conf1 =
|
||||||
|
Conf0 ++
|
||||||
|
"\n"
|
||||||
|
"bridges.kafka.myproducer.kafka.message.key = \"\""
|
||||||
|
"\n"
|
||||||
|
"bridges.kafka.myproducer.kafka.partition_strategy = \"key_dispatch\"",
|
||||||
|
Conf = parse(Conf1),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
<<"kafka">> :=
|
||||||
|
#{
|
||||||
|
<<"partition_strategy">> := <<"key_dispatch">>,
|
||||||
|
<<"message">> := #{<<"key">> := <<>>}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
emqx_utils_maps:deep_get([<<"bridges">>, <<"kafka">>, <<"myproducer">>], Conf)
|
||||||
|
),
|
||||||
|
?assertThrow(
|
||||||
|
{_, [
|
||||||
|
#{
|
||||||
|
path := "bridges.kafka.myproducer.kafka",
|
||||||
|
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
|
||||||
|
}
|
||||||
|
]},
|
||||||
|
check(Conf)
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
%%===========================================================================
|
%%===========================================================================
|
||||||
%% Helper functions
|
%% Helper functions
|
||||||
%%===========================================================================
|
%%===========================================================================
|
||||||
|
|
|
@ -0,0 +1 @@
|
||||||
|
Added a schema validation to ensure message key is not empty when dispatching by key in Kafka Producer bridge.
|
Loading…
Reference in New Issue