From 9b7e473cf65e4bbff3a2af7880d6c97572a95a5a Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 26 May 2023 11:58:14 -0300 Subject: [PATCH 1/2] feat(kafka_producer): add validation for empty message key when strategy = key_dispatch Fixes https://emqx.atlassian.net/browse/EMQX-9979 --- .../src/emqx_bridge_kafka.app.src | 2 +- .../src/emqx_bridge_kafka.erl | 11 ++++++- .../test/emqx_bridge_kafka_tests.erl | 30 +++++++++++++++++++ changes/ee/feat-10841.en.md | 1 + 4 files changed, 42 insertions(+), 2 deletions(-) create mode 100644 changes/ee/feat-10841.en.md diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src index 6c103f73b..64811c91c 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_kafka, [ {description, "EMQX Enterprise Kafka Bridge"}, - {vsn, "0.1.2"}, + {vsn, "0.1.3"}, {registered, [emqx_bridge_kafka_consumer_sup]}, {applications, [ kernel, diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 30f6cd60d..73a71787e 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -247,7 +247,8 @@ fields(producer_opts) -> {kafka, mk(ref(producer_kafka_opts), #{ required => true, - desc => ?DESC(producer_kafka_opts) + desc => ?DESC(producer_kafka_opts), + validator => fun producer_strategy_key_validator/1 })} ]; fields(producer_kafka_opts) -> @@ -459,3 +460,11 @@ consumer_topic_mapping_validator(TopicMapping = [_ | _]) -> false -> {error, "Kafka topics must not be repeated in a bridge"} end. + +producer_strategy_key_validator(#{ + <<"partition_strategy">> := key_dispatch, + <<"message">> := #{<<"key">> := ""} +}) -> + {error, "Message key cannot be empty when `key_dispatch` strategy is used"}; +producer_strategy_key_validator(_) -> + ok. diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index b16df854f..23bcf8e9d 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -138,6 +138,36 @@ kafka_consumer_test() -> ok. +message_key_dispatch_validations_test() -> + Conf0 = kafka_producer_new_hocon(), + Conf1 = + Conf0 ++ + "\n" + "bridges.kafka.myproducer.kafka.message.key = \"\"" + "\n" + "bridges.kafka.myproducer.kafka.partition_strategy = \"key_dispatch\"", + Conf = parse(Conf1), + ?assertMatch( + #{ + <<"kafka">> := + #{ + <<"partition_strategy">> := <<"key_dispatch">>, + <<"message">> := #{<<"key">> := <<>>} + } + }, + emqx_utils_maps:deep_get([<<"bridges">>, <<"kafka">>, <<"myproducer">>], Conf) + ), + ?assertThrow( + {_, [ + #{ + path := "bridges.kafka.myproducer.kafka", + reason := "Message key cannot be empty when `key_dispatch` strategy is used" + } + ]}, + check(Conf) + ), + ok. + %%=========================================================================== %% Helper functions %%=========================================================================== diff --git a/changes/ee/feat-10841.en.md b/changes/ee/feat-10841.en.md new file mode 100644 index 000000000..3bf8daa24 --- /dev/null +++ b/changes/ee/feat-10841.en.md @@ -0,0 +1 @@ +Added a schema validation to ensure message key is not empty when dispatching by key in Kafka Producer bridge. From 3edbad9f564d5755796a73d39b28c6921d036aaa Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Fri, 26 May 2023 17:50:32 -0300 Subject: [PATCH 2/2] feat(pulsar_producer): add validation for empty message key when strategy = key_dispatch --- .../src/emqx_bridge_pulsar.erl | 10 +++ .../test/emqx_bridge_pulsar_tests.erl | 75 +++++++++++++++++++ changes/ee/feat-10841.en.md | 2 +- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl | 3 +- rebar.config.erl | 1 + 5 files changed, 89 insertions(+), 2 deletions(-) create mode 100644 apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index 721937cd2..602e9cfdd 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -18,6 +18,8 @@ %% emqx_ee_bridge "unofficial" API -export([conn_bridge_examples/1]). +-export([producer_strategy_key_validator/1]). + %%------------------------------------------------------------------------------------------------- %% `hocon_schema' API %%------------------------------------------------------------------------------------------------- @@ -218,6 +220,14 @@ conn_bridge_examples(_Method) -> } ]. +producer_strategy_key_validator(#{ + <<"strategy">> := key_dispatch, + <<"message">> := #{<<"key">> := ""} +}) -> + {error, "Message key cannot be empty when `key_dispatch` strategy is used"}; +producer_strategy_key_validator(_) -> + ok. + %%------------------------------------------------------------------------------------------------- %% Internal fns %%------------------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl new file mode 100644 index 000000000..d46f2af6f --- /dev/null +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl @@ -0,0 +1,75 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_pulsar_tests). + +-include_lib("eunit/include/eunit.hrl"). + +%%=========================================================================== +%% Test cases +%%=========================================================================== + +pulsar_producer_validations_test() -> + Conf0 = pulsar_producer_hocon(), + Conf1 = + Conf0 ++ + "\n" + "bridges.pulsar_producer.my_producer.strategy = key_dispatch" + "\n" + "bridges.pulsar_producer.my_producer.message.key = \"\"", + Conf = parse(Conf1), + ?assertMatch( + #{ + <<"strategy">> := <<"key_dispatch">>, + <<"message">> := #{<<"key">> := <<>>} + }, + emqx_utils_maps:deep_get([<<"bridges">>, <<"pulsar_producer">>, <<"my_producer">>], Conf) + ), + ?assertThrow( + {_, [ + #{ + path := "bridges.pulsar_producer.my_producer", + reason := "Message key cannot be empty when `key_dispatch` strategy is used" + } + ]}, + check(Conf) + ), + + ok. + +%%=========================================================================== +%% Helper functions +%%=========================================================================== + +parse(Hocon) -> + {ok, Conf} = hocon:binary(Hocon), + Conf. + +check(Conf) when is_map(Conf) -> + hocon_tconf:check_plain(emqx_bridge_schema, Conf). + +%%=========================================================================== +%% Data section +%%=========================================================================== + +%% erlfmt-ignore +pulsar_producer_hocon() -> +""" +bridges.pulsar_producer.my_producer { + enable = true + servers = \"localhost:6650\" + pulsar_topic = pulsar_topic + strategy = random + message { + key = \"${.clientid}\" + value = \"${.}\" + } + authentication = none + ssl { + enable = false + verify = verify_none + server_name_indication = \"auto\" + } +} +""". diff --git a/changes/ee/feat-10841.en.md b/changes/ee/feat-10841.en.md index 3bf8daa24..2bcf25c63 100644 --- a/changes/ee/feat-10841.en.md +++ b/changes/ee/feat-10841.en.md @@ -1 +1 @@ -Added a schema validation to ensure message key is not empty when dispatching by key in Kafka Producer bridge. +Added a schema validation to ensure message key is not empty when dispatching by key in Kafka and Pulsar Producer bridges. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 3636e3eb2..be42a913a 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -242,7 +242,8 @@ pulsar_structs() -> hoconsc:map(name, ref(emqx_bridge_pulsar, pulsar_producer)), #{ desc => <<"Pulsar Producer Bridge Config">>, - required => false + required => false, + validator => fun emqx_bridge_pulsar:producer_strategy_key_validator/1 } )} ]. diff --git a/rebar.config.erl b/rebar.config.erl index 933e87181..843db9605 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -91,6 +91,7 @@ is_community_umbrella_app("apps/emqx_bridge_matrix") -> false; is_community_umbrella_app("apps/emqx_bridge_mongodb") -> false; is_community_umbrella_app("apps/emqx_bridge_mysql") -> false; is_community_umbrella_app("apps/emqx_bridge_pgsql") -> false; +is_community_umbrella_app("apps/emqx_bridge_pulsar") -> false; is_community_umbrella_app("apps/emqx_bridge_redis") -> false; is_community_umbrella_app("apps/emqx_bridge_rocketmq") -> false; is_community_umbrella_app("apps/emqx_bridge_tdengine") -> false;