%%-------------------------------------------------------------------- %% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved. %%-------------------------------------------------------------------- -module(emqx_bridge_kafka_tests). -include_lib("eunit/include/eunit.hrl"). %%=========================================================================== %% Test cases %%=========================================================================== kafka_producer_test() -> Conf1 = parse(kafka_producer_old_hocon(_WithLocalTopic0 = false)), Conf2 = parse(kafka_producer_old_hocon(_WithLocalTopic1 = true)), Conf3 = parse(kafka_producer_new_hocon()), ?assertMatch( #{ <<"bridges">> := #{ <<"kafka">> := #{ <<"myproducer">> := #{<<"kafka">> := #{}} } } }, check(Conf1) ), ?assertNotMatch( #{ <<"bridges">> := #{ <<"kafka">> := #{ <<"myproducer">> := #{<<"local_topic">> := _} } } }, check(Conf1) ), ?assertMatch( #{ <<"bridges">> := #{ <<"kafka">> := #{ <<"myproducer">> := #{ <<"kafka">> := #{}, <<"local_topic">> := <<"mqtt/local">> } } } }, check(Conf2) ), ?assertMatch( #{ <<"bridges">> := #{ <<"kafka">> := #{ <<"myproducer">> := #{ <<"kafka">> := #{}, <<"local_topic">> := <<"mqtt/local">> } } } }, check(Conf3) ), ok. kafka_consumer_test() -> Conf1 = parse(kafka_consumer_hocon()), ?assertMatch( #{ <<"bridges">> := #{ <<"kafka_consumer">> := #{ <<"my_consumer">> := _ } } }, check(Conf1) ), %% Bad: can't repeat kafka topics. BadConf1 = emqx_utils_maps:deep_put( [<<"bridges">>, <<"kafka_consumer">>, <<"my_consumer">>, <<"topic_mapping">>], Conf1, [ #{ <<"kafka_topic">> => <<"t1">>, <<"mqtt_topic">> => <<"mqtt/t1">>, <<"qos">> => 1, <<"payload_template">> => <<"${.}">> }, #{ <<"kafka_topic">> => <<"t1">>, <<"mqtt_topic">> => <<"mqtt/t2">>, <<"qos">> => 2, <<"payload_template">> => <<"v = ${.value}">> } ] ), ?assertThrow( {_, [ #{ path := "bridges.kafka_consumer.my_consumer.topic_mapping", reason := "Kafka topics must not be repeated in a bridge" } ]}, check(BadConf1) ), %% Bad: there must be at least 1 mapping. BadConf2 = emqx_utils_maps:deep_put( [<<"bridges">>, <<"kafka_consumer">>, <<"my_consumer">>, <<"topic_mapping">>], Conf1, [] ), ?assertThrow( {_, [ #{ path := "bridges.kafka_consumer.my_consumer.topic_mapping", reason := "There must be at least one Kafka-MQTT topic mapping" } ]}, check(BadConf2) ), ok. message_key_dispatch_validations_test() -> Conf0 = kafka_producer_new_hocon(), Conf1 = Conf0 ++ "\n" "bridges.kafka.myproducer.kafka.message.key = \"\"" "\n" "bridges.kafka.myproducer.kafka.partition_strategy = \"key_dispatch\"", Conf = parse(Conf1), ?assertMatch( #{ <<"kafka">> := #{ <<"partition_strategy">> := <<"key_dispatch">>, <<"message">> := #{<<"key">> := <<>>} } }, emqx_utils_maps:deep_get([<<"bridges">>, <<"kafka">>, <<"myproducer">>], Conf) ), ?assertThrow( {_, [ #{ path := "bridges.kafka.myproducer.kafka", reason := "Message key cannot be empty when `key_dispatch` strategy is used" } ]}, check(Conf) ), ok. tcp_keepalive_validation_test_() -> ProducerConf = parse(kafka_producer_new_hocon()), ConsumerConf = parse(kafka_consumer_hocon()), test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++ test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf). test_keepalive_validation(Name, Conf) -> Path = [<<"bridges">>] ++ Name ++ [<<"socket_opts">>, <<"tcp_keepalive">>], Conf1 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,7">>), Conf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"none">>), ValidConfs = [Conf, Conf1, Conf2], InvalidConf = emqx_utils_maps:deep_force_put(Path, Conf, <<"invalid">>), InvalidConf1 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6">>), InvalidConf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,1000">>), InvalidConfs = [InvalidConf, InvalidConf1, InvalidConf2], [?_assertMatch(#{<<"bridges">> := _}, check(C)) || C <- ValidConfs] ++ [?_assertThrow(_, check(C)) || C <- InvalidConfs]. %%=========================================================================== %% Helper functions %%=========================================================================== parse(Hocon) -> {ok, Conf} = hocon:binary(Hocon), Conf. check(Conf) when is_map(Conf) -> hocon_tconf:check_plain(emqx_bridge_schema, Conf). %%=========================================================================== %% Data section %%=========================================================================== %% erlfmt-ignore kafka_producer_old_hocon(_WithLocalTopic = true) -> kafka_producer_old_hocon("mqtt {topic = \"mqtt/local\"}\n"); kafka_producer_old_hocon(_WithLocalTopic = false) -> kafka_producer_old_hocon("mqtt {}\n"); kafka_producer_old_hocon(MQTTConfig) when is_list(MQTTConfig) -> """ bridges.kafka { myproducer { authentication = \"none\" bootstrap_hosts = \"toxiproxy:9292\" connect_timeout = \"5s\" metadata_request_timeout = \"5s\" min_metadata_refresh_interval = \"3s\" producer { kafka { buffer { memory_overload_protection = false mode = \"memory\" per_partition_limit = \"2GB\" segment_bytes = \"100MB\" } compression = \"no_compression\" max_batch_bytes = \"896KB\" max_inflight = 10 message { key = \"${.clientid}\" timestamp = \"${.timestamp}\" value = \"${.}\" } partition_count_refresh_interval = \"60s\" partition_strategy = \"random\" required_acks = \"all_isr\" topic = \"test-topic-two-partitions\" } """ ++ MQTTConfig ++ """ } socket_opts { nodelay = true recbuf = \"1024KB\" sndbuf = \"1024KB\" } ssl {enable = false, verify = \"verify_peer\"} } } """. kafka_producer_new_hocon() -> "" "\n" "bridges.kafka {\n" " myproducer {\n" " authentication = \"none\"\n" " bootstrap_hosts = \"toxiproxy:9292\"\n" " connect_timeout = \"5s\"\n" " metadata_request_timeout = \"5s\"\n" " min_metadata_refresh_interval = \"3s\"\n" " kafka {\n" " buffer {\n" " memory_overload_protection = false\n" " mode = \"memory\"\n" " per_partition_limit = \"2GB\"\n" " segment_bytes = \"100MB\"\n" " }\n" " compression = \"no_compression\"\n" " max_batch_bytes = \"896KB\"\n" " max_inflight = 10\n" " message {\n" " key = \"${.clientid}\"\n" " timestamp = \"${.timestamp}\"\n" " value = \"${.}\"\n" " }\n" " partition_count_refresh_interval = \"60s\"\n" " partition_strategy = \"random\"\n" " required_acks = \"all_isr\"\n" " topic = \"test-topic-two-partitions\"\n" " }\n" " local_topic = \"mqtt/local\"\n" " socket_opts {\n" " nodelay = true\n" " recbuf = \"1024KB\"\n" " sndbuf = \"1024KB\"\n" " }\n" " ssl {enable = false, verify = \"verify_peer\"}\n" " }\n" "}\n" "". %% erlfmt-ignore kafka_consumer_hocon() -> """ bridges.kafka_consumer.my_consumer { enable = true bootstrap_hosts = \"kafka-1.emqx.net:9292\" connect_timeout = 5s min_metadata_refresh_interval = 3s metadata_request_timeout = 5s authentication = { mechanism = plain username = emqxuser password = password } kafka { max_batch_bytes = 896KB max_rejoin_attempts = 5 offset_commit_interval_seconds = 3 offset_reset_policy = latest } topic_mapping = [ { kafka_topic = \"kafka-topic-1\" mqtt_topic = \"mqtt/topic/1\" qos = 1 payload_template = \"${.}\" }, { kafka_topic = \"kafka-topic-2\" mqtt_topic = \"mqtt/topic/2\" qos = 2 payload_template = \"v = ${.value}\" } ] key_encoding_mode = none value_encoding_mode = none ssl { enable = false verify = verify_none server_name_indication = \"auto\" } } """.