test: make bridge name unique in tests

This commit is contained in:
Kjell Winblad 2022-09-15 16:21:32 +02:00
parent 4dc26eeba7
commit be7a8c11a8
1 changed files with 27 additions and 13 deletions

View File

@ -38,23 +38,28 @@ do_publish(Conf, KafkaTopic, InstId) ->
}, },
{ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
ct:pal("base offset before testing ~p", [Offset]), ct:pal("base offset before testing ~p", [Offset]),
{ok, State} = ?PRODUCER:on_start(InstId, Conf), StartRes = ?PRODUCER:on_start(InstId, Conf),
ok = ?PRODUCER:on_query(InstId, {send_message, Msg}, State), {ok, State} = StartRes,
OnQueryRes = ?PRODUCER:on_query(InstId, {send_message, Msg}, State),
ok = OnQueryRes,
{ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
ok = ?PRODUCER:on_stop(InstId, State), ok = ?PRODUCER:on_stop(InstId, State),
ok. ok.
t_publish(_CtConfig) -> t_publish(_CtConfig) ->
InstId = emqx_bridge_resource:resource_id("kafka", "NoAuthInst"),
KafkaTopic = "test-topic-one-partition", KafkaTopic = "test-topic-one-partition",
Conf = config(#{ Conf = config(#{
"authentication" => "none", "authentication" => "none",
"kafka_hosts_string" => kafka_hosts_string(), "kafka_hosts_string" => kafka_hosts_string(),
"kafka_topic" => KafkaTopic "kafka_topic" => KafkaTopic,
"instance_id" => InstId
}), }),
do_publish(Conf, KafkaTopic, <<"NoAuthInst">>). do_publish(Conf, KafkaTopic, InstId).
t_publish_sasl_plain(_CtConfig) -> t_publish_sasl_plain(_CtConfig) ->
InstId = emqx_bridge_resource:resource_id("kafka", "SASLPlainInst"),
KafkaTopic = "test-topic-one-partition", KafkaTopic = "test-topic-one-partition",
Conf = config(#{ Conf = config(#{
"authentication" => #{ "authentication" => #{
@ -63,11 +68,14 @@ t_publish_sasl_plain(_CtConfig) ->
"password" => "password" "password" => "password"
}, },
"kafka_hosts_string" => kafka_hosts_string_sasl(), "kafka_hosts_string" => kafka_hosts_string_sasl(),
"kafka_topic" => KafkaTopic "kafka_topic" => KafkaTopic,
"instance_id" => InstId
}), }),
do_publish(Conf, KafkaTopic, <<"SASLPlainInst">>). do_publish(Conf, KafkaTopic, InstId).
t_publish_sasl_scram256(_CtConfig) -> t_publish_sasl_scram256(_CtConfig) ->
InstId = emqx_bridge_resource:resource_id("kafka", "SASLScram256Inst"),
KafkaTopic = "test-topic-one-partition",
KafkaTopic = "test-topic-one-partition", KafkaTopic = "test-topic-one-partition",
Conf = config(#{ Conf = config(#{
"authentication" => #{ "authentication" => #{
@ -76,11 +84,13 @@ t_publish_sasl_scram256(_CtConfig) ->
"password" => "password" "password" => "password"
}, },
"kafka_hosts_string" => kafka_hosts_string_sasl(), "kafka_hosts_string" => kafka_hosts_string_sasl(),
"kafka_topic" => KafkaTopic "kafka_topic" => KafkaTopic,
"instance_id" => InstId
}), }),
do_publish(Conf, KafkaTopic, <<"SASLScram256Inst">>). do_publish(Conf, KafkaTopic, InstId).
t_publish_sasl_scram512(_CtConfig) -> t_publish_sasl_scram512(_CtConfig) ->
InstId = emqx_bridge_resource:resource_id("kafka", "SASLScram512Inst"),
KafkaTopic = "test-topic-one-partition", KafkaTopic = "test-topic-one-partition",
Conf = config(#{ Conf = config(#{
"authentication" => #{ "authentication" => #{
@ -89,11 +99,13 @@ t_publish_sasl_scram512(_CtConfig) ->
"password" => "password" "password" => "password"
}, },
"kafka_hosts_string" => kafka_hosts_string_sasl(), "kafka_hosts_string" => kafka_hosts_string_sasl(),
"kafka_topic" => KafkaTopic "kafka_topic" => KafkaTopic,
"instance_id" => InstId
}), }),
do_publish(Conf, KafkaTopic, <<"SASLScram512Inst">>). do_publish(Conf, KafkaTopic, InstId).
t_publish_sasl_kerberos(_CtConfig) -> t_publish_sasl_kerberos(_CtConfig) ->
InstId = emqx_bridge_resource:resource_id("kafka", "SASLKerberosInst"),
KafkaTopic = "test-topic-one-partition", KafkaTopic = "test-topic-one-partition",
Conf = config(#{ Conf = config(#{
"authentication" => #{ "authentication" => #{
@ -101,9 +113,10 @@ t_publish_sasl_kerberos(_CtConfig) ->
"kerberos_keytab_file" => "/var/lib/secret/rig.key" "kerberos_keytab_file" => "/var/lib/secret/rig.key"
}, },
"kafka_hosts_string" => kafka_hosts_string_sasl(), "kafka_hosts_string" => kafka_hosts_string_sasl(),
"kafka_topic" => KafkaTopic "kafka_topic" => KafkaTopic,
"instance_id" => InstId
}), }),
do_publish(Conf, KafkaTopic, <<"SASLKerberosInst">>). do_publish(Conf, KafkaTopic, InstId).
config(Args) -> config(Args) ->
{ok, Conf} = hocon:binary(hocon_config(Args)), {ok, Conf} = hocon:binary(hocon_config(Args)),
@ -112,7 +125,8 @@ config(Args) ->
#{<<"config">> => Conf}, #{<<"config">> => Conf},
#{atom_key => true} #{atom_key => true}
), ),
Parsed#{bridge_name => "testbridge"}. InstId = maps:get("instance_id", Args),
Parsed#{bridge_name => erlang:element(2, emqx_bridge_resource:parse_bridge_id(InstId))}.
hocon_config(Args) -> hocon_config(Args) ->
AuthConf = maps:get("authentication", Args), AuthConf = maps:get("authentication", Args),