364 lines
12 KiB
Erlang
364 lines
12 KiB
Erlang
%%--------------------------------------------------------------------
|
|
%% Copyright (c) 2023-2024 EMQ Technologies Co., Ltd. All Rights Reserved.
|
|
%%--------------------------------------------------------------------
|
|
|
|
-module(emqx_bridge_kafka_tests).
|
|
|
|
-include_lib("eunit/include/eunit.hrl").
|
|
|
|
-export([atoms/0, kafka_producer_old_hocon/1]).
|
|
|
|
%% ensure atoms exist
|
|
atoms() -> [myproducer, my_consumer].
|
|
|
|
%%===========================================================================
|
|
%% Test cases
|
|
%%===========================================================================
|
|
|
|
kafka_producer_test() ->
|
|
Conf1 = parse(kafka_producer_old_hocon(_WithLocalTopic0 = false)),
|
|
Conf2 = parse(kafka_producer_old_hocon(_WithLocalTopic1 = true)),
|
|
Conf3 = parse(kafka_producer_new_hocon()),
|
|
?assertMatch(
|
|
#{
|
|
<<"bridges">> :=
|
|
#{
|
|
<<"kafka">> :=
|
|
#{
|
|
<<"myproducer">> :=
|
|
#{<<"kafka">> := #{}}
|
|
}
|
|
}
|
|
},
|
|
check(Conf1)
|
|
),
|
|
?assertNotMatch(
|
|
#{
|
|
<<"bridges">> :=
|
|
#{
|
|
<<"kafka">> :=
|
|
#{
|
|
<<"myproducer">> :=
|
|
#{<<"local_topic">> := _}
|
|
}
|
|
}
|
|
},
|
|
check(Conf1)
|
|
),
|
|
?assertMatch(
|
|
#{
|
|
<<"bridges">> :=
|
|
#{
|
|
<<"kafka">> :=
|
|
#{
|
|
<<"myproducer">> :=
|
|
#{
|
|
<<"kafka">> := #{},
|
|
<<"local_topic">> := <<"mqtt/local">>
|
|
}
|
|
}
|
|
}
|
|
},
|
|
check(Conf2)
|
|
),
|
|
?assertMatch(
|
|
#{
|
|
<<"bridges">> :=
|
|
#{
|
|
<<"kafka">> :=
|
|
#{
|
|
<<"myproducer">> :=
|
|
#{
|
|
<<"kafka">> := #{},
|
|
<<"local_topic">> := <<"mqtt/local">>
|
|
}
|
|
}
|
|
}
|
|
},
|
|
check(Conf3)
|
|
),
|
|
|
|
ok.
|
|
|
|
kafka_consumer_test() ->
|
|
Conf1 = parse(kafka_consumer_hocon()),
|
|
?assertMatch(
|
|
#{
|
|
<<"bridges">> :=
|
|
#{
|
|
<<"kafka_consumer">> :=
|
|
#{
|
|
<<"my_consumer">> := _
|
|
}
|
|
}
|
|
},
|
|
check(Conf1)
|
|
),
|
|
|
|
%% Bad: can't repeat kafka topics.
|
|
BadConf1 = emqx_utils_maps:deep_put(
|
|
[<<"bridges">>, <<"kafka_consumer">>, <<"my_consumer">>, <<"topic_mapping">>],
|
|
Conf1,
|
|
[
|
|
#{
|
|
<<"kafka_topic">> => <<"t1">>,
|
|
<<"mqtt_topic">> => <<"mqtt/t1">>,
|
|
<<"qos">> => 1,
|
|
<<"payload_template">> => <<"${.}">>
|
|
},
|
|
#{
|
|
<<"kafka_topic">> => <<"t1">>,
|
|
<<"mqtt_topic">> => <<"mqtt/t2">>,
|
|
<<"qos">> => 2,
|
|
<<"payload_template">> => <<"v = ${.value}">>
|
|
}
|
|
]
|
|
),
|
|
?assertThrow(
|
|
{_, [
|
|
#{
|
|
path := "bridges.kafka_consumer.my_consumer.topic_mapping",
|
|
reason := "Kafka topics must not be repeated in a bridge"
|
|
}
|
|
]},
|
|
check(BadConf1)
|
|
),
|
|
|
|
%% Bad: there must be at least 1 mapping.
|
|
BadConf2 = emqx_utils_maps:deep_put(
|
|
[<<"bridges">>, <<"kafka_consumer">>, <<"my_consumer">>, <<"topic_mapping">>],
|
|
Conf1,
|
|
[]
|
|
),
|
|
?assertThrow(
|
|
{_, [
|
|
#{
|
|
path := "bridges.kafka_consumer.my_consumer.topic_mapping",
|
|
reason := "There must be at least one Kafka-MQTT topic mapping"
|
|
}
|
|
]},
|
|
check(BadConf2)
|
|
),
|
|
|
|
ok.
|
|
|
|
message_key_dispatch_validations_test() ->
|
|
Name = myproducer,
|
|
Conf0 = kafka_producer_new_hocon(),
|
|
Conf1 =
|
|
Conf0 ++
|
|
"\n"
|
|
"bridges.kafka.myproducer.kafka.message.key = \"\""
|
|
"\n"
|
|
"bridges.kafka.myproducer.kafka.partition_strategy = \"key_dispatch\"",
|
|
Conf = parse(Conf1),
|
|
?assertMatch(
|
|
#{
|
|
<<"kafka">> :=
|
|
#{
|
|
<<"partition_strategy">> := <<"key_dispatch">>,
|
|
<<"message">> := #{<<"key">> := <<>>}
|
|
}
|
|
},
|
|
emqx_utils_maps:deep_get(
|
|
[<<"bridges">>, <<"kafka">>, atom_to_binary(Name)], Conf
|
|
)
|
|
),
|
|
?assertThrow(
|
|
{_, [
|
|
#{
|
|
path := "bridges.kafka.myproducer.kafka",
|
|
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
|
|
}
|
|
]},
|
|
check(Conf)
|
|
),
|
|
?assertThrow(
|
|
{_, [
|
|
#{
|
|
path := "bridges.kafka.myproducer.kafka",
|
|
reason := "Message key cannot be empty when `key_dispatch` strategy is used"
|
|
}
|
|
]},
|
|
check_atom_key(Conf)
|
|
),
|
|
ok.
|
|
|
|
tcp_keepalive_validation_test_() ->
|
|
ProducerConf = parse(kafka_producer_new_hocon()),
|
|
ConsumerConf = parse(kafka_consumer_hocon()),
|
|
test_keepalive_validation([<<"kafka">>, <<"myproducer">>], ProducerConf) ++
|
|
test_keepalive_validation([<<"kafka_consumer">>, <<"my_consumer">>], ConsumerConf).
|
|
|
|
test_keepalive_validation(Name, Conf) ->
|
|
Path = [<<"bridges">>] ++ Name ++ [<<"socket_opts">>, <<"tcp_keepalive">>],
|
|
Conf1 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,7">>),
|
|
Conf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"none">>),
|
|
ValidConfs = [Conf, Conf1, Conf2],
|
|
InvalidConf = emqx_utils_maps:deep_force_put(Path, Conf, <<"invalid">>),
|
|
InvalidConf1 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6">>),
|
|
InvalidConf2 = emqx_utils_maps:deep_force_put(Path, Conf, <<"5,6,1000">>),
|
|
InvalidConfs = [InvalidConf, InvalidConf1, InvalidConf2],
|
|
[?_assertMatch(#{<<"bridges">> := _}, check(C)) || C <- ValidConfs] ++
|
|
[?_assertMatch(#{bridges := _}, check_atom_key(C)) || C <- ValidConfs] ++
|
|
[?_assertThrow(_, check(C)) || C <- InvalidConfs] ++
|
|
[?_assertThrow(_, check_atom_key(C)) || C <- InvalidConfs].
|
|
|
|
%%===========================================================================
|
|
%% Helper functions
|
|
%%===========================================================================
|
|
|
|
parse(Hocon) ->
|
|
{ok, Conf} = hocon:binary(Hocon),
|
|
Conf.
|
|
|
|
%% what bridge creation does
|
|
check(Conf) when is_map(Conf) ->
|
|
hocon_tconf:check_plain(emqx_bridge_schema, Conf).
|
|
|
|
%% what bridge probe does
|
|
check_atom_key(Conf) when is_map(Conf) ->
|
|
hocon_tconf:check_plain(emqx_bridge_schema, Conf, #{atom_key => true, required => false}).
|
|
|
|
%%===========================================================================
|
|
%% Data section
|
|
%%===========================================================================
|
|
|
|
kafka_producer_old_hocon(_WithLocalTopic = true) ->
|
|
kafka_producer_old_hocon("mqtt {topic = \"mqtt/local\"}\n");
|
|
kafka_producer_old_hocon(_WithLocalTopic = false) ->
|
|
kafka_producer_old_hocon("mqtt {}\n");
|
|
kafka_producer_old_hocon(MQTTConfig) when is_list(MQTTConfig) ->
|
|
[
|
|
"bridges.kafka {"
|
|
"\n myproducer {"
|
|
"\n authentication = \"none\""
|
|
"\n bootstrap_hosts = \"toxiproxy:9292\""
|
|
"\n connect_timeout = \"5s\""
|
|
"\n metadata_request_timeout = \"5s\""
|
|
"\n min_metadata_refresh_interval = \"3s\""
|
|
"\n producer {"
|
|
"\n kafka {"
|
|
"\n buffer {"
|
|
"\n memory_overload_protection = false"
|
|
"\n mode = \"memory\""
|
|
"\n per_partition_limit = \"2GB\""
|
|
"\n segment_bytes = \"100MB\""
|
|
"\n }"
|
|
"\n compression = \"no_compression\""
|
|
"\n max_batch_bytes = \"896KB\""
|
|
"\n max_inflight = 10"
|
|
"\n message {"
|
|
"\n key = \"${.clientid}\""
|
|
"\n timestamp = \"${.timestamp}\""
|
|
"\n value = \"${.}\""
|
|
"\n }"
|
|
"\n partition_count_refresh_interval = \"60s\""
|
|
"\n partition_strategy = \"random\""
|
|
"\n required_acks = \"all_isr\""
|
|
"\n topic = \"test-topic-two-partitions\""
|
|
"\n }",
|
|
MQTTConfig,
|
|
"\n }"
|
|
"\n socket_opts {"
|
|
"\n nodelay = true"
|
|
"\n recbuf = \"1024KB\""
|
|
"\n sndbuf = \"1024KB\""
|
|
"\n }"
|
|
"\n ssl {enable = false, verify = \"verify_peer\"}"
|
|
"\n }"
|
|
"\n}"
|
|
].
|
|
|
|
kafka_producer_new_hocon() ->
|
|
"bridges.kafka {"
|
|
"\n myproducer {"
|
|
"\n authentication = \"none\""
|
|
"\n bootstrap_hosts = \"toxiproxy:9292\""
|
|
"\n connect_timeout = \"5s\""
|
|
"\n metadata_request_timeout = \"5s\""
|
|
"\n min_metadata_refresh_interval = \"3s\""
|
|
"\n kafka {"
|
|
"\n buffer {"
|
|
"\n memory_overload_protection = false"
|
|
"\n mode = \"memory\""
|
|
"\n per_partition_limit = \"2GB\""
|
|
"\n segment_bytes = \"100MB\""
|
|
"\n }"
|
|
"\n compression = \"no_compression\""
|
|
"\n max_batch_bytes = \"896KB\""
|
|
"\n max_inflight = 10"
|
|
"\n message {"
|
|
"\n key = \"${.clientid}\""
|
|
"\n timestamp = \"${.timestamp}\""
|
|
"\n value = \"${.}\""
|
|
"\n }"
|
|
"\n partition_count_refresh_interval = \"60s\""
|
|
"\n partition_strategy = \"random\""
|
|
"\n required_acks = \"all_isr\""
|
|
"\n topic = \"test-topic-two-partitions\""
|
|
"\n }"
|
|
"\n local_topic = \"mqtt/local\""
|
|
"\n socket_opts {"
|
|
"\n nodelay = true"
|
|
"\n recbuf = \"1024KB\""
|
|
"\n sndbuf = \"1024KB\""
|
|
"\n }"
|
|
"\n ssl {enable = false, verify = \"verify_peer\"}"
|
|
"\n resource_opts {"
|
|
"\n health_check_interval = 10s"
|
|
"\n }"
|
|
"\n }"
|
|
"\n}".
|
|
|
|
kafka_consumer_hocon() ->
|
|
"bridges.kafka_consumer.my_consumer {"
|
|
"\n enable = true"
|
|
"\n bootstrap_hosts = \"kafka-1.emqx.net:9292\""
|
|
"\n connect_timeout = 5s"
|
|
"\n min_metadata_refresh_interval = 3s"
|
|
"\n metadata_request_timeout = 5s"
|
|
"\n authentication = {"
|
|
"\n mechanism = plain"
|
|
"\n username = emqxuser"
|
|
"\n password = password"
|
|
"\n }"
|
|
"\n kafka {"
|
|
"\n max_batch_bytes = 896KB"
|
|
"\n max_rejoin_attempts = 5"
|
|
"\n offset_commit_interval_seconds = 3s"
|
|
"\n offset_reset_policy = latest"
|
|
"\n }"
|
|
"\n topic_mapping = ["
|
|
"\n {"
|
|
"\n kafka_topic = \"kafka-topic-1\""
|
|
"\n mqtt_topic = \"mqtt/topic/1\""
|
|
"\n qos = 1"
|
|
"\n payload_template = \"${.}\""
|
|
"\n },"
|
|
"\n {"
|
|
"\n kafka_topic = \"kafka-topic-2\""
|
|
"\n mqtt_topic = \"mqtt/topic/2\""
|
|
"\n qos = 2"
|
|
"\n payload_template = \"v = ${.value}\""
|
|
"\n }"
|
|
"\n ]"
|
|
"\n key_encoding_mode = none"
|
|
"\n value_encoding_mode = none"
|
|
"\n ssl {"
|
|
"\n enable = false"
|
|
"\n verify = verify_none"
|
|
"\n server_name_indication = \"auto\""
|
|
"\n }"
|
|
"\n resource_opts {"
|
|
"\n health_check_interval = 10s"
|
|
"\n }"
|
|
"\n }".
|
|
|
|
%% assert compatibility
|
|
bridge_schema_json_test() ->
|
|
JSON = iolist_to_binary(emqx_dashboard_schema_api:bridge_schema_json()),
|
|
Map = emqx_utils_json:decode(JSON),
|
|
Path = [<<"components">>, <<"schemas">>, <<"bridge_kafka.post_producer">>, <<"properties">>],
|
|
?assertMatch(#{<<"kafka">> := _}, emqx_utils_maps:deep_get(Path, Map)).
|