From 9b7e473cf65e4bbff3a2af7880d6c97572a95a5a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 26 May 2023 11:58:14 -0300 Subject: [PATCH] feat(kafka_producer): add validation for empty message key when strategy = key_dispatch Fixes https://emqx.atlassian.net/browse/EMQX-9979 --- .../src/emqx_bridge_kafka.app.src | 2 +- .../src/emqx_bridge_kafka.erl | 11 ++++++- .../test/emqx_bridge_kafka_tests.erl | 30 +++++++++++++++++++ changes/ee/feat-10841.en.md | 1 + 4 files changed, 42 insertions(+), 2 deletions(-) create mode 100644 changes/ee/feat-10841.en.md diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 6c103f73b..64811c91c 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 30f6cd60d..73a71787e 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -247,7 +247,8 @@ fields(producer_opts) -> {kafka, mk(ref(producer_kafka_opts), #{ required => true, - desc => ?DESC(producer_kafka_opts) + desc => ?DESC(producer_kafka_opts), + validator => fun producer_strategy_key_validator/1 })} ]; fields(producer_kafka_opts) -> @@ -459,3 +460,11 @@ consumer_topic_mapping_validator(TopicMapping = [_ | _]) -> false -> {error, "Kafka topics must not be repeated in a bridge"} end. + +producer_strategy_key_validator(#{ + <<"partition_strategy">> := key_dispatch, + <<"message">> := #{<<"key">> := ""} +}) -> + {error, "Message key cannot be empty when `key_dispatch` strategy is used"}; +producer_strategy_key_validator(_) -> + ok. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index b16df854f..23bcf8e9d 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -138,6 +138,36 @@ kafka_consumer_test() -> ok. +message_key_dispatch_validations_test() -> + Conf0 = kafka_producer_new_hocon(), + Conf1 = + Conf0 ++ + "\n" + "bridges.kafka.myproducer.kafka.message.key = \"\"" + "\n" + "bridges.kafka.myproducer.kafka.partition_strategy = \"key_dispatch\"", + Conf = parse(Conf1), + ?assertMatch( + #{ + <<"kafka">> := + #{ + <<"partition_strategy">> := <<"key_dispatch">>, + <<"message">> := #{<<"key">> := <<>>} + } + }, + emqx_utils_maps:deep_get([<<"bridges">>, <<"kafka">>, <<"myproducer">>], Conf) + ), + ?assertThrow( + {_, [ + #{ + path := "bridges.kafka.myproducer.kafka", + reason := "Message key cannot be empty when `key_dispatch` strategy is used" + } + ]}, + check(Conf) + ), + ok. + %%=========================================================================== %% Helper functions %%=========================================================================== diff --git a/changes/ee/feat-10841.en.md b/changes/ee/feat-10841.en.md new file mode 100644 index 000000000..3bf8daa24 --- /dev/null +++ b/changes/ee/feat-10841.en.md @@ -0,0 +1 @@ +Added a schema validation to ensure message key is not empty when dispatching by key in Kafka Producer bridge.