emqx/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl

336 lines
9.8 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_kafka_tests).
-include_lib("eunit/include/eunit.hrl").
%%===========================================================================
%% Test cases
%%===========================================================================
kafka_producer_test() ->
Conf1 = parse(kafka_producer_old_hocon(_WithLocalTopic0 = false)),
Conf2 = parse(kafka_producer_old_hocon(_WithLocalTopic1 = true)),
Conf3 = parse(kafka_producer_new_hocon()),
?assertMatch(
#{
<<"bridges">> :=
#{
<<"kafka">> :=
#{
<<"myproducer">> :=
#{<<"kafka">> := #{}}
}
}
},
check(Conf1)
),
?assertNotMatch(
#{
<<"bridges">> :=
#{
<<"kafka">> :=
#{
<<"myproducer">> :=
#{<<"local_topic">> := _}
}
}
},
check(Conf1)
),
?assertMatch(
#{
<<"bridges">> :=
#{
<<"kafka">> :=
#{
<<"myproducer">> :=
#{
<<"kafka">> := #{},
<<"local_topic">> := <<"mqtt/local">>
}
}
}
},
check(Conf2)
),
?assertMatch(
#{
<<"bridges">> :=
#{
<<"kafka">> :=
#{
<<"myproducer">> :=
#{
<<"kafka">> := #{},
<<"local_topic">> := <<"mqtt/local">>
}
}
}
},
check(Conf3)
),
ok.
kafka_consumer_test() ->
Conf1 = parse(kafka_consumer_hocon()),
?assertMatch(
#{
<<"bridges">> :=
#{
<<"kafka_consumer">> :=
#{
<<"my_consumer">> := _
}
}
},
check(Conf1)
),
%% Bad: can't repeat kafka topics.
BadConf1 = emqx_utils_maps:deep_put(
[<<"bridges">>, <<"kafka_consumer">>, <<"my_consumer">>, <<"topic_mapping">>],
Conf1,
[
#{
<<"kafka_topic">> => <<"t1">>,
<<"mqtt_topic">> => <<"mqtt/t1">>,
<<"qos">> => 1,
<<"payload_template">> => <<"${.}">>
},
#{
<<"kafka_topic">> => <<"t1">>,
<<"mqtt_topic">> => <<"mqtt/t2">>,
<<"qos">> => 2,
<<"payload_template">> => <<"v = ${.value}">>
}
]
),
?assertThrow(
{_, [
#{
path := "bridges.kafka_consumer.my_consumer.topic_mapping",
reason := "Kafka topics must not be repeated in a bridge"
}
]},
check(BadConf1)
),
%% Bad: there must be at least 1 mapping.
BadConf2 = emqx_utils_maps:deep_put(
[<<"bridges">>, <<"kafka_consumer">>, <<"my_consumer">>, <<"topic_mapping">>],
Conf1,
[]
),
?assertThrow(
{_, [
#{
path := "bridges.kafka_consumer.my_consumer.topic_mapping",
reason := "There must be at least one Kafka-MQTT topic mapping"
}
]},
check(BadConf2)
),
ok.
message_key_dispatch_validations_test() ->
Conf0 = kafka_producer_new_hocon(),
Conf1 =
Conf0 ++
"\n"
"bridges.kafka.myproducer.kafka.message.key = \"\""
"\n"
"bridges.kafka.myproducer.kafka.partition_strategy = \"key_dispatch\"",
Conf = parse(Conf1),
?assertMatch(
#{
<<"kafka">> :=
#{
<<"partition_strategy">> := <<"key_dispatch">>,
<<"message">> := #{<<"key">> := <<>>}
}
},
emqx_utils_maps:deep_get([<<"bridges">>, <<"kafka">>, <<"myproducer">>], Conf)
),
?assertThrow(
{_, [
#{
path := "bridges.kafka.myproducer.kafka",
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
}
]},
check(Conf)
),
ok.
tcp_keepalive_validation_test_() ->
ProducerConf = parse(kafka_producer_new_hocon()),
ConsumerConf = parse(kafka_consumer_hocon()),
test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++
test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf).
test_keepalive_validation(Name, Conf) ->
Path = [<<"bridges">>] ++ Name ++ [<<"socket_opts">>, <<"tcp_keepalive">>],
Conf1 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,7">>),
Conf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"none">>),
ValidConfs = [Conf, Conf1, Conf2],
InvalidConf = emqx_utils_maps:deep_force_put(Path, Conf, <<"invalid">>),
InvalidConf1 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6">>),
InvalidConf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,1000">>),
InvalidConfs = [InvalidConf, InvalidConf1, InvalidConf2],
[?_assertMatch(#{<<"bridges">> := _}, check(C)) || C <- ValidConfs] ++
[?_assertThrow(_, check(C)) || C <- InvalidConfs].
%%===========================================================================
%% Helper functions
%%===========================================================================
parse(Hocon) ->
{ok, Conf} = hocon:binary(Hocon),
Conf.
check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf).
%%===========================================================================
%% Data section
%%===========================================================================
%% erlfmt-ignore
kafka_producer_old_hocon(_WithLocalTopic = true) ->
kafka_producer_old_hocon("mqtt {topic = \"mqtt/local\"}\n");
kafka_producer_old_hocon(_WithLocalTopic = false) ->
kafka_producer_old_hocon("mqtt {}\n");
kafka_producer_old_hocon(MQTTConfig) when is_list(MQTTConfig) ->
"""
bridges.kafka {
myproducer {
authentication = \"none\"
bootstrap_hosts = \"toxiproxy:9292\"
connect_timeout = \"5s\"
metadata_request_timeout = \"5s\"
min_metadata_refresh_interval = \"3s\"
producer {
kafka {
buffer {
memory_overload_protection = false
mode = \"memory\"
per_partition_limit = \"2GB\"
segment_bytes = \"100MB\"
}
compression = \"no_compression\"
max_batch_bytes = \"896KB\"
max_inflight = 10
message {
key = \"${.clientid}\"
timestamp = \"${.timestamp}\"
value = \"${.}\"
}
partition_count_refresh_interval = \"60s\"
partition_strategy = \"random\"
required_acks = \"all_isr\"
topic = \"test-topic-two-partitions\"
}
""" ++ MQTTConfig ++
"""
}
socket_opts {
nodelay = true
recbuf = \"1024KB\"
sndbuf = \"1024KB\"
}
ssl {enable = false, verify = \"verify_peer\"}
}
}
""".
kafka_producer_new_hocon() ->
""
"\n"
"bridges.kafka {\n"
" myproducer {\n"
" authentication = \"none\"\n"
" bootstrap_hosts = \"toxiproxy:9292\"\n"
" connect_timeout = \"5s\"\n"
" metadata_request_timeout = \"5s\"\n"
" min_metadata_refresh_interval = \"3s\"\n"
" kafka {\n"
" buffer {\n"
" memory_overload_protection = false\n"
" mode = \"memory\"\n"
" per_partition_limit = \"2GB\"\n"
" segment_bytes = \"100MB\"\n"
" }\n"
" compression = \"no_compression\"\n"
" max_batch_bytes = \"896KB\"\n"
" max_inflight = 10\n"
" message {\n"
" key = \"${.clientid}\"\n"
" timestamp = \"${.timestamp}\"\n"
" value = \"${.}\"\n"
" }\n"
" partition_count_refresh_interval = \"60s\"\n"
" partition_strategy = \"random\"\n"
" required_acks = \"all_isr\"\n"
" topic = \"test-topic-two-partitions\"\n"
" }\n"
" local_topic = \"mqtt/local\"\n"
" socket_opts {\n"
" nodelay = true\n"
" recbuf = \"1024KB\"\n"
" sndbuf = \"1024KB\"\n"
" }\n"
" ssl {enable = false, verify = \"verify_peer\"}\n"
" }\n"
"}\n"
"".
%% erlfmt-ignore
kafka_consumer_hocon() ->
"""
bridges.kafka_consumer.my_consumer {
enable = true
bootstrap_hosts = \"kafka-1.emqx.net:9292\"
connect_timeout = 5s
min_metadata_refresh_interval = 3s
metadata_request_timeout = 5s
authentication = {
mechanism = plain
username = emqxuser
password = password
}
kafka {
max_batch_bytes = 896KB
max_rejoin_attempts = 5
offset_commit_interval_seconds = 3
offset_reset_policy = latest
}
topic_mapping = [
{
kafka_topic = \"kafka-topic-1\"
mqtt_topic = \"mqtt/topic/1\"
qos = 1
payload_template = \"${.}\"
},
{
kafka_topic = \"kafka-topic-2\"
mqtt_topic = \"mqtt/topic/2\"
qos = 2
payload_template = \"v = ${.value}\"
}
]
key_encoding_mode = none
value_encoding_mode = none
ssl {
enable = false
verify = verify_none
server_name_indication = \"auto\"
}
}
""".