diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 2d67a9941..14567dd39 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -109,6 +109,9 @@ set_special_configs(_) -> t_publish_no_auth(_CtConfig) -> publish_with_and_without_ssl("none"). +t_publish_no_auth_key_dispatch(_CtConfig) -> + publish_with_and_without_ssl("none", #{"partition_strategy" => "key_dispatch"}). + t_publish_sasl_plain(_CtConfig) -> publish_with_and_without_ssl(valid_sasl_plain_settings()). @@ -404,20 +407,35 @@ t_failed_creation_then_fix(_Config) -> %%------------------------------------------------------------------------------ publish_with_and_without_ssl(AuthSettings) -> - publish_helper(#{ - auth_settings => AuthSettings, - ssl_settings => #{} - }), - publish_helper(#{ - auth_settings => AuthSettings, - ssl_settings => valid_ssl_settings() - }), + publish_with_and_without_ssl(AuthSettings, #{}). + +publish_with_and_without_ssl(AuthSettings, Config) -> + publish_helper( + #{ + auth_settings => AuthSettings, + ssl_settings => #{} + }, + Config + ), + publish_helper( + #{ + auth_settings => AuthSettings, + ssl_settings => valid_ssl_settings() + }, + Config + ), ok. -publish_helper(#{ - auth_settings := AuthSettings, - ssl_settings := SSLSettings -}) -> +publish_helper(AuthSettings) -> + publish_helper(AuthSettings, #{}). + +publish_helper( + #{ + auth_settings := AuthSettings, + ssl_settings := SSLSettings + }, + Conf0 +) -> HostsString = case {AuthSettings, SSLSettings} of {"none", Map} when map_size(Map) =:= 0 -> @@ -434,13 +452,17 @@ publish_helper(#{ InstId = emqx_bridge_resource:resource_id("kafka", Name), BridgeId = emqx_bridge_resource:bridge_id("kafka", Name), KafkaTopic = "test-topic-one-partition", - Conf = config(#{ - "authentication" => AuthSettings, - "kafka_hosts_string" => HostsString, - "kafka_topic" => KafkaTopic, - "instance_id" => InstId, - "ssl" => SSLSettings - }), + Conf = config( + #{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "instance_id" => InstId, + "ssl" => SSLSettings + }, + Conf0 + ), + emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), Conf, #{}), %% To make sure we get unique value timer:sleep(1), @@ -463,7 +485,15 @@ publish_helper(#{ ok = emqx_bridge_resource:remove(BridgeId), ok. +default_config() -> + #{"partition_strategy" => "random"}. + config(Args) -> + config(Args, #{}). + +config(Args0, More) -> + Args1 = maps:merge(default_config(), Args0), + Args = maps:merge(Args1, More), ConfText = hocon_config(Args), ct:pal("Running tests with conf:\n~s", [ConfText]), {ok, Conf} = hocon:binary(ConfText), @@ -506,6 +536,7 @@ producer = { kafka = { topic = \"{{ kafka_topic }}\" message = {key = \"${clientid}\", value = \"${.payload}\"} + partition_strategy = {{ partition_strategy }} } } """.