diff --git a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl index 721937cd2..602e9cfdd 100644 --- a/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl +++ b/apps/emqx_bridge_pulsar/src/emqx_bridge_pulsar.erl @@ -18,6 +18,8 @@ %% emqx_ee_bridge "unofficial" API -export([conn_bridge_examples/1]). +-export([producer_strategy_key_validator/1]). + %%------------------------------------------------------------------------------------------------- %% `hocon_schema' API %%------------------------------------------------------------------------------------------------- @@ -218,6 +220,14 @@ conn_bridge_examples(_Method) -> } ]. +producer_strategy_key_validator(#{ + <<"strategy">> := key_dispatch, + <<"message">> := #{<<"key">> := ""} +}) -> + {error, "Message key cannot be empty when `key_dispatch` strategy is used"}; +producer_strategy_key_validator(_) -> + ok. + %%------------------------------------------------------------------------------------------------- %% Internal fns %%------------------------------------------------------------------------------------------------- diff --git a/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl new file mode 100644 index 000000000..d46f2af6f --- /dev/null +++ b/apps/emqx_bridge_pulsar/test/emqx_bridge_pulsar_tests.erl @@ -0,0 +1,75 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_pulsar_tests). + +-include_lib("eunit/include/eunit.hrl"). + +%%=========================================================================== +%% Test cases +%%=========================================================================== + +pulsar_producer_validations_test() -> + Conf0 = pulsar_producer_hocon(), + Conf1 = + Conf0 ++ + "\n" + "bridges.pulsar_producer.my_producer.strategy = key_dispatch" + "\n" + "bridges.pulsar_producer.my_producer.message.key = \"\"", + Conf = parse(Conf1), + ?assertMatch( + #{ + <<"strategy">> := <<"key_dispatch">>, + <<"message">> := #{<<"key">> := <<>>} + }, + emqx_utils_maps:deep_get([<<"bridges">>, <<"pulsar_producer">>, <<"my_producer">>], Conf) + ), + ?assertThrow( + {_, [ + #{ + path := "bridges.pulsar_producer.my_producer", + reason := "Message key cannot be empty when `key_dispatch` strategy is used" + } + ]}, + check(Conf) + ), + + ok. + +%%=========================================================================== +%% Helper functions +%%=========================================================================== + +parse(Hocon) -> + {ok, Conf} = hocon:binary(Hocon), + Conf. + +check(Conf) when is_map(Conf) -> + hocon_tconf:check_plain(emqx_bridge_schema, Conf). + +%%=========================================================================== +%% Data section +%%=========================================================================== + +%% erlfmt-ignore +pulsar_producer_hocon() -> +""" +bridges.pulsar_producer.my_producer { + enable = true + servers = \"localhost:6650\" + pulsar_topic = pulsar_topic + strategy = random + message { + key = \"${.clientid}\" + value = \"${.}\" + } + authentication = none + ssl { + enable = false + verify = verify_none + server_name_indication = \"auto\" + } +} +""". diff --git a/changes/ee/feat-10841.en.md b/changes/ee/feat-10841.en.md index 3bf8daa24..2bcf25c63 100644 --- a/changes/ee/feat-10841.en.md +++ b/changes/ee/feat-10841.en.md @@ -1 +1 @@ -Added a schema validation to ensure message key is not empty when dispatching by key in Kafka Producer bridge. +Added a schema validation to ensure message key is not empty when dispatching by key in Kafka and Pulsar Producer bridges. diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 3636e3eb2..be42a913a 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -242,7 +242,8 @@ pulsar_structs() -> hoconsc:map(name, ref(emqx_bridge_pulsar, pulsar_producer)), #{ desc => <<"Pulsar Producer Bridge Config">>, - required => false + required => false, + validator => fun emqx_bridge_pulsar:producer_strategy_key_validator/1 } )} ]. diff --git a/rebar.config.erl b/rebar.config.erl index 933e87181..843db9605 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -91,6 +91,7 @@ is_community_umbrella_app("apps/emqx_bridge_matrix") -> false; is_community_umbrella_app("apps/emqx_bridge_mongodb") -> false; is_community_umbrella_app("apps/emqx_bridge_mysql") -> false; is_community_umbrella_app("apps/emqx_bridge_pgsql") -> false; +is_community_umbrella_app("apps/emqx_bridge_pulsar") -> false; is_community_umbrella_app("apps/emqx_bridge_redis") -> false; is_community_umbrella_app("apps/emqx_bridge_rocketmq") -> false; is_community_umbrella_app("apps/emqx_bridge_tdengine") -> false;