test: add coverage for key_dispatch partition strategy

This commit is contained in:
Zaiming (Stone) Shi 2023-01-18 10:05:31 +01:00
parent 3872c4451f
commit 8f275a66d0
1 changed files with 50 additions and 19 deletions

View File

@ -109,6 +109,9 @@ set_special_configs(_) ->
t_publish_no_auth(_CtConfig) -> t_publish_no_auth(_CtConfig) ->
publish_with_and_without_ssl("none"). publish_with_and_without_ssl("none").
t_publish_no_auth_key_dispatch(_CtConfig) ->
publish_with_and_without_ssl("none", #{"partition_strategy" => "key_dispatch"}).
t_publish_sasl_plain(_CtConfig) -> t_publish_sasl_plain(_CtConfig) ->
publish_with_and_without_ssl(valid_sasl_plain_settings()). publish_with_and_without_ssl(valid_sasl_plain_settings()).
@ -404,20 +407,35 @@ t_failed_creation_then_fix(_Config) ->
%%------------------------------------------------------------------------------ %%------------------------------------------------------------------------------
publish_with_and_without_ssl(AuthSettings) -> publish_with_and_without_ssl(AuthSettings) ->
publish_helper(#{ publish_with_and_without_ssl(AuthSettings, #{}).
publish_with_and_without_ssl(AuthSettings, Config) ->
publish_helper(
#{
auth_settings => AuthSettings, auth_settings => AuthSettings,
ssl_settings => #{} ssl_settings => #{}
}), },
publish_helper(#{ Config
),
publish_helper(
#{
auth_settings => AuthSettings, auth_settings => AuthSettings,
ssl_settings => valid_ssl_settings() ssl_settings => valid_ssl_settings()
}), },
Config
),
ok. ok.
publish_helper(#{ publish_helper(AuthSettings) ->
publish_helper(AuthSettings, #{}).
publish_helper(
#{
auth_settings := AuthSettings, auth_settings := AuthSettings,
ssl_settings := SSLSettings ssl_settings := SSLSettings
}) -> },
Conf0
) ->
HostsString = HostsString =
case {AuthSettings, SSLSettings} of case {AuthSettings, SSLSettings} of
{"none", Map} when map_size(Map) =:= 0 -> {"none", Map} when map_size(Map) =:= 0 ->
@ -434,13 +452,17 @@ publish_helper(#{
InstId = emqx_bridge_resource:resource_id("kafka", Name), InstId = emqx_bridge_resource:resource_id("kafka", Name),
BridgeId = emqx_bridge_resource:bridge_id("kafka", Name), BridgeId = emqx_bridge_resource:bridge_id("kafka", Name),
KafkaTopic = "test-topic-one-partition", KafkaTopic = "test-topic-one-partition",
Conf = config(#{ Conf = config(
#{
"authentication" => AuthSettings, "authentication" => AuthSettings,
"kafka_hosts_string" => HostsString, "kafka_hosts_string" => HostsString,
"kafka_topic" => KafkaTopic, "kafka_topic" => KafkaTopic,
"instance_id" => InstId, "instance_id" => InstId,
"ssl" => SSLSettings "ssl" => SSLSettings
}), },
Conf0
),
emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), Conf, #{}), emqx_bridge_resource:create(kafka, erlang:list_to_atom(Name), Conf, #{}),
%% To make sure we get unique value %% To make sure we get unique value
timer:sleep(1), timer:sleep(1),
@ -463,7 +485,15 @@ publish_helper(#{
ok = emqx_bridge_resource:remove(BridgeId), ok = emqx_bridge_resource:remove(BridgeId),
ok. ok.
default_config() ->
#{"partition_strategy" => "random"}.
config(Args) -> config(Args) ->
config(Args, #{}).
config(Args0, More) ->
Args1 = maps:merge(default_config(), Args0),
Args = maps:merge(Args1, More),
ConfText = hocon_config(Args), ConfText = hocon_config(Args),
ct:pal("Running tests with conf:\n~s", [ConfText]), ct:pal("Running tests with conf:\n~s", [ConfText]),
{ok, Conf} = hocon:binary(ConfText), {ok, Conf} = hocon:binary(ConfText),
@ -506,6 +536,7 @@ producer = {
kafka = { kafka = {
topic = \"{{ kafka_topic }}\" topic = \"{{ kafka_topic }}\"
message = {key = \"${clientid}\", value = \"${.payload}\"} message = {key = \"${clientid}\", value = \"${.payload}\"}
partition_strategy = {{ partition_strategy }}
} }
} }
""". """.