feat(pulsar_producer): add validation for empty message key when strategy = key_dispatch

This commit is contained in:
Thales Macedo Garitezi 2023-05-26 17:50:32 -03:00
parent 9b7e473cf6
commit 3edbad9f56
5 changed files with 89 additions and 2 deletions

View File

@ -18,6 +18,8 @@
%% emqx_ee_bridge "unofficial" API
-export([conn_bridge_examples/1]).
-export([producer_strategy_key_validator/1]).
%%-------------------------------------------------------------------------------------------------
%% `hocon_schema' API
%%-------------------------------------------------------------------------------------------------
@ -218,6 +220,14 @@ conn_bridge_examples(_Method) ->
}
].
producer_strategy_key_validator(#{
<<"strategy">> := key_dispatch,
<<"message">> := #{<<"key">> := ""}
}) ->
{error, "Message key cannot be empty when `key_dispatch` strategy is used"};
producer_strategy_key_validator(_) ->
ok.
%%-------------------------------------------------------------------------------------------------
%% Internal fns
%%-------------------------------------------------------------------------------------------------

View File

@ -0,0 +1,75 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_pulsar_tests).
-include_lib("eunit/include/eunit.hrl").
%%===========================================================================
%% Test cases
%%===========================================================================
pulsar_producer_validations_test() ->
Conf0 = pulsar_producer_hocon(),
Conf1 =
Conf0 ++
"\n"
"bridges.pulsar_producer.my_producer.strategy = key_dispatch"
"\n"
"bridges.pulsar_producer.my_producer.message.key = \"\"",
Conf = parse(Conf1),
?assertMatch(
#{
<<"strategy">> := <<"key_dispatch">>,
<<"message">> := #{<<"key">> := <<>>}
},
emqx_utils_maps:deep_get([<<"bridges">>, <<"pulsar_producer">>, <<"my_producer">>], Conf)
),
?assertThrow(
{_, [
#{
path := "bridges.pulsar_producer.my_producer",
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
}
]},
check(Conf)
),
ok.
%%===========================================================================
%% Helper functions
%%===========================================================================
parse(Hocon) ->
{ok, Conf} = hocon:binary(Hocon),
Conf.
check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf).
%%===========================================================================
%% Data section
%%===========================================================================
%% erlfmt-ignore
pulsar_producer_hocon() ->
"""
bridges.pulsar_producer.my_producer {
enable = true
servers = \"localhost:6650\"
pulsar_topic = pulsar_topic
strategy = random
message {
key = \"${.clientid}\"
value = \"${.}\"
}
authentication = none
ssl {
enable = false
verify = verify_none
server_name_indication = \"auto\"
}
}
""".

View File

@ -1 +1 @@
Added a schema validation to ensure message key is not empty when dispatching by key in Kafka Producer bridge.
Added a schema validation to ensure message key is not empty when dispatching by key in Kafka and Pulsar Producer bridges.

View File

@ -242,7 +242,8 @@ pulsar_structs() ->
hoconsc:map(name, ref(emqx_bridge_pulsar, pulsar_producer)),
#{
desc => <<"Pulsar Producer Bridge Config">>,
required => false
required => false,
validator => fun emqx_bridge_pulsar:producer_strategy_key_validator/1
}
)}
].

View File

@ -91,6 +91,7 @@ is_community_umbrella_app("apps/emqx_bridge_matrix") -> false;
is_community_umbrella_app("apps/emqx_bridge_mongodb") -> false;
is_community_umbrella_app("apps/emqx_bridge_mysql") -> false;
is_community_umbrella_app("apps/emqx_bridge_pgsql") -> false;
is_community_umbrella_app("apps/emqx_bridge_pulsar") -> false;
is_community_umbrella_app("apps/emqx_bridge_redis") -> false;
is_community_umbrella_app("apps/emqx_bridge_rocketmq") -> false;
is_community_umbrella_app("apps/emqx_bridge_tdengine") -> false;