diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl index 6adb66357..fba72a1d7 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_v2_kafka_producer_SUITE.erl @@ -25,6 +25,10 @@ -define(TYPE, kafka_producer). +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + all() -> emqx_common_test_helpers:all(?MODULE). @@ -51,6 +55,118 @@ end_per_suite(Config) -> emqx_cth_suite:stop(Apps), ok. +%%------------------------------------------------------------------------------------- +%% Helper fns +%%------------------------------------------------------------------------------------- + +check_send_message_with_bridge(BridgeName) -> + %% ###################################### + %% Create Kafka message + %% ###################################### + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Payload = list_to_binary("payload" ++ integer_to_list(Time)), + Msg = #{ + clientid => BinTime, + payload => Payload, + timestamp => Time + }, + Offset = resolve_kafka_offset(), + %% ###################################### + %% Send message + %% ###################################### + emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}), + %% ###################################### + %% Check if message is sent to Kafka + %% ###################################### + check_kafka_message_payload(Offset, Payload). + +resolve_kafka_offset() -> + KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + Partition = 0, + Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), + {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset( + Hosts, KafkaTopic, Partition + ), + Offset0. + +check_kafka_message_payload(Offset, ExpectedPayload) -> + KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), + Partition = 0, + Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), + {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset), + ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0). + +bridge_v2_config(ConnectorName) -> + #{ + <<"connector">> => ConnectorName, + <<"enable">> => true, + <<"kafka">> => #{ + <<"buffer">> => #{ + <<"memory_overload_protection">> => false, + <<"mode">> => <<"memory">>, + <<"per_partition_limit">> => <<"2GB">>, + <<"segment_bytes">> => <<"100MB">> + }, + <<"compression">> => <<"no_compression">>, + <<"kafka_header_value_encode_mode">> => <<"none">>, + <<"max_batch_bytes">> => <<"896KB">>, + <<"max_inflight">> => 10, + <<"message">> => #{ + <<"key">> => <<"${.clientid}">>, + <<"timestamp">> => <<"${.timestamp}">>, + <<"value">> => <<"${.payload}">> + }, + <<"partition_count_refresh_interval">> => <<"60s">>, + <<"partition_strategy">> => <<"random">>, + <<"query_mode">> => <<"sync">>, + <<"required_acks">> => <<"all_isr">>, + <<"sync_query_timeout">> => <<"5s">>, + <<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition() + }, + <<"local_topic">> => <<"kafka_t/#">>, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"15s">> + } + }. + +connector_config() -> + #{ + <<"authentication">> => <<"none">>, + <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()), + <<"connect_timeout">> => <<"5s">>, + <<"enable">> => true, + <<"metadata_request_timeout">> => <<"5s">>, + <<"min_metadata_refresh_interval">> => <<"3s">>, + <<"socket_opts">> => + #{ + <<"recbuf">> => <<"1024KB">>, + <<"sndbuf">> => <<"1024KB">>, + <<"tcp_keepalive">> => <<"none">> + }, + <<"ssl">> => + #{ + <<"ciphers">> => [], + <<"depth">> => 10, + <<"enable">> => false, + <<"hibernate_after">> => <<"5s">>, + <<"log_level">> => <<"notice">>, + <<"reuse_sessions">> => true, + <<"secure_renegotiate">> => true, + <<"verify">> => <<"verify_peer">>, + <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>] + } + }. + +kafka_hosts_string() -> + KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"), + KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"), + KafkaHost ++ ":" ++ KafkaPort. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + t_create_remove_list(_) -> [] = emqx_bridge_v2:list(), ConnectorConfig = connector_config(), @@ -186,107 +302,3 @@ t_unknown_topic(_Config) -> emqx_bridge_v2_testlib:get_bridge_api(?TYPE, BridgeName) ), ok. - -check_send_message_with_bridge(BridgeName) -> - %% ###################################### - %% Create Kafka message - %% ###################################### - Time = erlang:unique_integer(), - BinTime = integer_to_binary(Time), - Payload = list_to_binary("payload" ++ integer_to_list(Time)), - Msg = #{ - clientid => BinTime, - payload => Payload, - timestamp => Time - }, - Offset = resolve_kafka_offset(), - %% ###################################### - %% Send message - %% ###################################### - emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}), - %% ###################################### - %% Check if message is sent to Kafka - %% ###################################### - check_kafka_message_payload(Offset, Payload). - -resolve_kafka_offset() -> - KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), - Partition = 0, - Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), - {ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset( - Hosts, KafkaTopic, Partition - ), - Offset0. - -check_kafka_message_payload(Offset, ExpectedPayload) -> - KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(), - Partition = 0, - Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(), - {ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset), - ?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0). - -bridge_v2_config(ConnectorName) -> - #{ - <<"connector">> => ConnectorName, - <<"enable">> => true, - <<"kafka">> => #{ - <<"buffer">> => #{ - <<"memory_overload_protection">> => false, - <<"mode">> => <<"memory">>, - <<"per_partition_limit">> => <<"2GB">>, - <<"segment_bytes">> => <<"100MB">> - }, - <<"compression">> => <<"no_compression">>, - <<"kafka_header_value_encode_mode">> => <<"none">>, - <<"max_batch_bytes">> => <<"896KB">>, - <<"max_inflight">> => 10, - <<"message">> => #{ - <<"key">> => <<"${.clientid}">>, - <<"timestamp">> => <<"${.timestamp}">>, - <<"value">> => <<"${.payload}">> - }, - <<"partition_count_refresh_interval">> => <<"60s">>, - <<"partition_strategy">> => <<"random">>, - <<"query_mode">> => <<"sync">>, - <<"required_acks">> => <<"all_isr">>, - <<"sync_query_timeout">> => <<"5s">>, - <<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition() - }, - <<"local_topic">> => <<"kafka_t/#">>, - <<"resource_opts">> => #{ - <<"health_check_interval">> => <<"15s">> - } - }. - -connector_config() -> - #{ - <<"authentication">> => <<"none">>, - <<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()), - <<"connect_timeout">> => <<"5s">>, - <<"enable">> => true, - <<"metadata_request_timeout">> => <<"5s">>, - <<"min_metadata_refresh_interval">> => <<"3s">>, - <<"socket_opts">> => - #{ - <<"recbuf">> => <<"1024KB">>, - <<"sndbuf">> => <<"1024KB">>, - <<"tcp_keepalive">> => <<"none">> - }, - <<"ssl">> => - #{ - <<"ciphers">> => [], - <<"depth">> => 10, - <<"enable">> => false, - <<"hibernate_after">> => <<"5s">>, - <<"log_level">> => <<"notice">>, - <<"reuse_sessions">> => true, - <<"secure_renegotiate">> => true, - <<"verify">> => <<"verify_peer">>, - <<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>] - } - }. - -kafka_hosts_string() -> - KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"), - KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"), - KafkaHost ++ ":" ++ KafkaPort.