emqx/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl

364 lines
12 KiB
Erlang

%%--------------------------------------------------------------------
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_bridge_kafka_tests).
-include_lib("eunit/include/eunit.hrl").
-export([atoms/0, kafka_producer_old_hocon/1]).
%% ensure atoms exist
atoms() -> [myproducer, my_consumer].
%%===========================================================================
%% Test cases
%%===========================================================================
kafka_producer_test() ->
Conf1 = parse(kafka_producer_old_hocon(_WithLocalTopic0 = false)),
Conf2 = parse(kafka_producer_old_hocon(_WithLocalTopic1 = true)),
Conf3 = parse(kafka_producer_new_hocon()),
?assertMatch(
#{
<<"bridges">> :=
#{
<<"kafka">> :=
#{
<<"myproducer">> :=
#{<<"kafka">> := #{}}
}
}
},
check(Conf1)
),
?assertNotMatch(
#{
<<"bridges">> :=
#{
<<"kafka">> :=
#{
<<"myproducer">> :=
#{<<"local_topic">> := _}
}
}
},
check(Conf1)
),
?assertMatch(
#{
<<"bridges">> :=
#{
<<"kafka">> :=
#{
<<"myproducer">> :=
#{
<<"kafka">> := #{},
<<"local_topic">> := <<"mqtt/local">>
}
}
}
},
check(Conf2)
),
?assertMatch(
#{
<<"bridges">> :=
#{
<<"kafka">> :=
#{
<<"myproducer">> :=
#{
<<"kafka">> := #{},
<<"local_topic">> := <<"mqtt/local">>
}
}
}
},
check(Conf3)
),
ok.
kafka_consumer_test() ->
Conf1 = parse(kafka_consumer_hocon()),
?assertMatch(
#{
<<"bridges">> :=
#{
<<"kafka_consumer">> :=
#{
<<"my_consumer">> := _
}
}
},
check(Conf1)
),
%% Bad: can't repeat kafka topics.
BadConf1 = emqx_utils_maps:deep_put(
[<<"bridges">>, <<"kafka_consumer">>, <<"my_consumer">>, <<"topic_mapping">>],
Conf1,
[
#{
<<"kafka_topic">> => <<"t1">>,
<<"mqtt_topic">> => <<"mqtt/t1">>,
<<"qos">> => 1,
<<"payload_template">> => <<"${.}">>
},
#{
<<"kafka_topic">> => <<"t1">>,
<<"mqtt_topic">> => <<"mqtt/t2">>,
<<"qos">> => 2,
<<"payload_template">> => <<"v = ${.value}">>
}
]
),
?assertThrow(
{_, [
#{
path := "bridges.kafka_consumer.my_consumer.topic_mapping",
reason := "Kafka topics must not be repeated in a bridge"
}
]},
check(BadConf1)
),
%% Bad: there must be at least 1 mapping.
BadConf2 = emqx_utils_maps:deep_put(
[<<"bridges">>, <<"kafka_consumer">>, <<"my_consumer">>, <<"topic_mapping">>],
Conf1,
[]
),
?assertThrow(
{_, [
#{
path := "bridges.kafka_consumer.my_consumer.topic_mapping",
reason := "There must be at least one Kafka-MQTT topic mapping"
}
]},
check(BadConf2)
),
ok.
message_key_dispatch_validations_test() ->
Name = myproducer,
Conf0 = kafka_producer_new_hocon(),
Conf1 =
Conf0 ++
"\n"
"bridges.kafka.myproducer.kafka.message.key = \"\""
"\n"
"bridges.kafka.myproducer.kafka.partition_strategy = \"key_dispatch\"",
Conf = parse(Conf1),
?assertMatch(
#{
<<"kafka">> :=
#{
<<"partition_strategy">> := <<"key_dispatch">>,
<<"message">> := #{<<"key">> := <<>>}
}
},
emqx_utils_maps:deep_get(
[<<"bridges">>, <<"kafka">>, atom_to_binary(Name)], Conf
)
),
?assertThrow(
{_, [
#{
path := "bridges.kafka.myproducer.kafka",
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
}
]},
check(Conf)
),
?assertThrow(
{_, [
#{
path := "bridges.kafka.myproducer.kafka",
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
}
]},
check_atom_key(Conf)
),
ok.
tcp_keepalive_validation_test_() ->
ProducerConf = parse(kafka_producer_new_hocon()),
ConsumerConf = parse(kafka_consumer_hocon()),
test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++
test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf).
test_keepalive_validation(Name, Conf) ->
Path = [<<"bridges">>] ++ Name ++ [<<"socket_opts">>, <<"tcp_keepalive">>],
Conf1 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,7">>),
Conf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"none">>),
ValidConfs = [Conf, Conf1, Conf2],
InvalidConf = emqx_utils_maps:deep_force_put(Path, Conf, <<"invalid">>),
InvalidConf1 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6">>),
InvalidConf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,1000">>),
InvalidConfs = [InvalidConf, InvalidConf1, InvalidConf2],
[?_assertMatch(#{<<"bridges">> := _}, check(C)) || C <- ValidConfs] ++
[?_assertMatch(#{bridges := _}, check_atom_key(C)) || C <- ValidConfs] ++
[?_assertThrow(_, check(C)) || C <- InvalidConfs] ++
[?_assertThrow(_, check_atom_key(C)) || C <- InvalidConfs].
%%===========================================================================
%% Helper functions
%%===========================================================================
parse(Hocon) ->
{ok, Conf} = hocon:binary(Hocon),
Conf.
%% what bridge creation does
check(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf).
%% what bridge probe does
check_atom_key(Conf) when is_map(Conf) ->
hocon_tconf:check_plain(emqx_bridge_schema, Conf, #{atom_key => true, required => false}).
%%===========================================================================
%% Data section
%%===========================================================================
kafka_producer_old_hocon(_WithLocalTopic = true) ->
kafka_producer_old_hocon("mqtt {topic = \"mqtt/local\"}\n");
kafka_producer_old_hocon(_WithLocalTopic = false) ->
kafka_producer_old_hocon("mqtt {}\n");
kafka_producer_old_hocon(MQTTConfig) when is_list(MQTTConfig) ->
[
"bridges.kafka {"
"\n myproducer {"
"\n authentication = \"none\""
"\n bootstrap_hosts = \"toxiproxy:9292\""
"\n connect_timeout = \"5s\""
"\n metadata_request_timeout = \"5s\""
"\n min_metadata_refresh_interval = \"3s\""
"\n producer {"
"\n kafka {"
"\n buffer {"
"\n memory_overload_protection = false"
"\n mode = \"memory\""
"\n per_partition_limit = \"2GB\""
"\n segment_bytes = \"100MB\""
"\n }"
"\n compression = \"no_compression\""
"\n max_batch_bytes = \"896KB\""
"\n max_inflight = 10"
"\n message {"
"\n key = \"${.clientid}\""
"\n timestamp = \"${.timestamp}\""
"\n value = \"${.}\""
"\n }"
"\n partition_count_refresh_interval = \"60s\""
"\n partition_strategy = \"random\""
"\n required_acks = \"all_isr\""
"\n topic = \"test-topic-two-partitions\""
"\n }",
MQTTConfig,
"\n }"
"\n socket_opts {"
"\n nodelay = true"
"\n recbuf = \"1024KB\""
"\n sndbuf = \"1024KB\""
"\n }"
"\n ssl {enable = false, verify = \"verify_peer\"}"
"\n }"
"\n}"
].
kafka_producer_new_hocon() ->
"bridges.kafka {"
"\n myproducer {"
"\n authentication = \"none\""
"\n bootstrap_hosts = \"toxiproxy:9292\""
"\n connect_timeout = \"5s\""
"\n metadata_request_timeout = \"5s\""
"\n min_metadata_refresh_interval = \"3s\""
"\n kafka {"
"\n buffer {"
"\n memory_overload_protection = false"
"\n mode = \"memory\""
"\n per_partition_limit = \"2GB\""
"\n segment_bytes = \"100MB\""
"\n }"
"\n compression = \"no_compression\""
"\n max_batch_bytes = \"896KB\""
"\n max_inflight = 10"
"\n message {"
"\n key = \"${.clientid}\""
"\n timestamp = \"${.timestamp}\""
"\n value = \"${.}\""
"\n }"
"\n partition_count_refresh_interval = \"60s\""
"\n partition_strategy = \"random\""
"\n required_acks = \"all_isr\""
"\n topic = \"test-topic-two-partitions\""
"\n }"
"\n local_topic = \"mqtt/local\""
"\n socket_opts {"
"\n nodelay = true"
"\n recbuf = \"1024KB\""
"\n sndbuf = \"1024KB\""
"\n }"
"\n ssl {enable = false, verify = \"verify_peer\"}"
"\n resource_opts {"
"\n health_check_interval = 10s"
"\n }"
"\n }"
"\n}".
kafka_consumer_hocon() ->
"bridges.kafka_consumer.my_consumer {"
"\n enable = true"
"\n bootstrap_hosts = \"kafka-1.emqx.net:9292\""
"\n connect_timeout = 5s"
"\n min_metadata_refresh_interval = 3s"
"\n metadata_request_timeout = 5s"
"\n authentication = {"
"\n mechanism = plain"
"\n username = emqxuser"
"\n password = password"
"\n }"
"\n kafka {"
"\n max_batch_bytes = 896KB"
"\n max_rejoin_attempts = 5"
"\n offset_commit_interval_seconds = 3s"
"\n offset_reset_policy = latest"
"\n }"
"\n topic_mapping = ["
"\n {"
"\n kafka_topic = \"kafka-topic-1\""
"\n mqtt_topic = \"mqtt/topic/1\""
"\n qos = 1"
"\n payload_template = \"${.}\""
"\n },"
"\n {"
"\n kafka_topic = \"kafka-topic-2\""
"\n mqtt_topic = \"mqtt/topic/2\""
"\n qos = 2"
"\n payload_template = \"v = ${.value}\""
"\n }"
"\n ]"
"\n key_encoding_mode = none"
"\n value_encoding_mode = none"
"\n ssl {"
"\n enable = false"
"\n verify = verify_none"
"\n server_name_indication = \"auto\""
"\n }"
"\n resource_opts {"
"\n health_check_interval = 10s"
"\n }"
"\n }".
%% assert compatibility
bridge_schema_json_test() ->
JSON = iolist_to_binary(emqx_dashboard_schema_api:bridge_schema_json()),
Map = emqx_utils_json:decode(JSON),
Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>],
?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)).