test: reorganize test suite a bit
This commit is contained in:
parent
90571b7d8e
commit
36b5d58957
|
@ -25,6 +25,10 @@
|
|||
|
||||
-define(TYPE, kafka_producer).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% CT boilerplate
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
|
@ -51,6 +55,118 @@ end_per_suite(Config) ->
|
|||
emqx_cth_suite:stop(Apps),
|
||||
ok.
|
||||
|
||||
%%-------------------------------------------------------------------------------------
|
||||
%% Helper fns
|
||||
%%-------------------------------------------------------------------------------------
|
||||
|
||||
check_send_message_with_bridge(BridgeName) ->
|
||||
%% ######################################
|
||||
%% Create Kafka message
|
||||
%% ######################################
|
||||
Time = erlang:unique_integer(),
|
||||
BinTime = integer_to_binary(Time),
|
||||
Payload = list_to_binary("payload" ++ integer_to_list(Time)),
|
||||
Msg = #{
|
||||
clientid => BinTime,
|
||||
payload => Payload,
|
||||
timestamp => Time
|
||||
},
|
||||
Offset = resolve_kafka_offset(),
|
||||
%% ######################################
|
||||
%% Send message
|
||||
%% ######################################
|
||||
emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}),
|
||||
%% ######################################
|
||||
%% Check if message is sent to Kafka
|
||||
%% ######################################
|
||||
check_kafka_message_payload(Offset, Payload).
|
||||
|
||||
resolve_kafka_offset() ->
|
||||
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
|
||||
Partition = 0,
|
||||
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
|
||||
{ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset(
|
||||
Hosts, KafkaTopic, Partition
|
||||
),
|
||||
Offset0.
|
||||
|
||||
check_kafka_message_payload(Offset, ExpectedPayload) ->
|
||||
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
|
||||
Partition = 0,
|
||||
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
|
||||
{ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
|
||||
?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
|
||||
|
||||
bridge_v2_config(ConnectorName) ->
|
||||
#{
|
||||
<<"connector">> => ConnectorName,
|
||||
<<"enable">> => true,
|
||||
<<"kafka">> => #{
|
||||
<<"buffer">> => #{
|
||||
<<"memory_overload_protection">> => false,
|
||||
<<"mode">> => <<"memory">>,
|
||||
<<"per_partition_limit">> => <<"2GB">>,
|
||||
<<"segment_bytes">> => <<"100MB">>
|
||||
},
|
||||
<<"compression">> => <<"no_compression">>,
|
||||
<<"kafka_header_value_encode_mode">> => <<"none">>,
|
||||
<<"max_batch_bytes">> => <<"896KB">>,
|
||||
<<"max_inflight">> => 10,
|
||||
<<"message">> => #{
|
||||
<<"key">> => <<"${.clientid}">>,
|
||||
<<"timestamp">> => <<"${.timestamp}">>,
|
||||
<<"value">> => <<"${.payload}">>
|
||||
},
|
||||
<<"partition_count_refresh_interval">> => <<"60s">>,
|
||||
<<"partition_strategy">> => <<"random">>,
|
||||
<<"query_mode">> => <<"sync">>,
|
||||
<<"required_acks">> => <<"all_isr">>,
|
||||
<<"sync_query_timeout">> => <<"5s">>,
|
||||
<<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
|
||||
},
|
||||
<<"local_topic">> => <<"kafka_t/#">>,
|
||||
<<"resource_opts">> => #{
|
||||
<<"health_check_interval">> => <<"15s">>
|
||||
}
|
||||
}.
|
||||
|
||||
connector_config() ->
|
||||
#{
|
||||
<<"authentication">> => <<"none">>,
|
||||
<<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()),
|
||||
<<"connect_timeout">> => <<"5s">>,
|
||||
<<"enable">> => true,
|
||||
<<"metadata_request_timeout">> => <<"5s">>,
|
||||
<<"min_metadata_refresh_interval">> => <<"3s">>,
|
||||
<<"socket_opts">> =>
|
||||
#{
|
||||
<<"recbuf">> => <<"1024KB">>,
|
||||
<<"sndbuf">> => <<"1024KB">>,
|
||||
<<"tcp_keepalive">> => <<"none">>
|
||||
},
|
||||
<<"ssl">> =>
|
||||
#{
|
||||
<<"ciphers">> => [],
|
||||
<<"depth">> => 10,
|
||||
<<"enable">> => false,
|
||||
<<"hibernate_after">> => <<"5s">>,
|
||||
<<"log_level">> => <<"notice">>,
|
||||
<<"reuse_sessions">> => true,
|
||||
<<"secure_renegotiate">> => true,
|
||||
<<"verify">> => <<"verify_peer">>,
|
||||
<<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
|
||||
}
|
||||
}.
|
||||
|
||||
kafka_hosts_string() ->
|
||||
KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"),
|
||||
KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"),
|
||||
KafkaHost ++ ":" ++ KafkaPort.
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% Testcases
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
t_create_remove_list(_) ->
|
||||
[] = emqx_bridge_v2:list(),
|
||||
ConnectorConfig = connector_config(),
|
||||
|
@ -186,107 +302,3 @@ t_unknown_topic(_Config) ->
|
|||
emqx_bridge_v2_testlib:get_bridge_api(?TYPE, BridgeName)
|
||||
),
|
||||
ok.
|
||||
|
||||
check_send_message_with_bridge(BridgeName) ->
|
||||
%% ######################################
|
||||
%% Create Kafka message
|
||||
%% ######################################
|
||||
Time = erlang:unique_integer(),
|
||||
BinTime = integer_to_binary(Time),
|
||||
Payload = list_to_binary("payload" ++ integer_to_list(Time)),
|
||||
Msg = #{
|
||||
clientid => BinTime,
|
||||
payload => Payload,
|
||||
timestamp => Time
|
||||
},
|
||||
Offset = resolve_kafka_offset(),
|
||||
%% ######################################
|
||||
%% Send message
|
||||
%% ######################################
|
||||
emqx_bridge_v2:send_message(?TYPE, BridgeName, Msg, #{}),
|
||||
%% ######################################
|
||||
%% Check if message is sent to Kafka
|
||||
%% ######################################
|
||||
check_kafka_message_payload(Offset, Payload).
|
||||
|
||||
resolve_kafka_offset() ->
|
||||
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
|
||||
Partition = 0,
|
||||
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
|
||||
{ok, Offset0} = emqx_bridge_kafka_impl_producer_SUITE:resolve_kafka_offset(
|
||||
Hosts, KafkaTopic, Partition
|
||||
),
|
||||
Offset0.
|
||||
|
||||
check_kafka_message_payload(Offset, ExpectedPayload) ->
|
||||
KafkaTopic = emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition(),
|
||||
Partition = 0,
|
||||
Hosts = emqx_bridge_kafka_impl_producer_SUITE:kafka_hosts(),
|
||||
{ok, {_, [KafkaMsg0]}} = brod:fetch(Hosts, KafkaTopic, Partition, Offset),
|
||||
?assertMatch(#kafka_message{value = ExpectedPayload}, KafkaMsg0).
|
||||
|
||||
bridge_v2_config(ConnectorName) ->
|
||||
#{
|
||||
<<"connector">> => ConnectorName,
|
||||
<<"enable">> => true,
|
||||
<<"kafka">> => #{
|
||||
<<"buffer">> => #{
|
||||
<<"memory_overload_protection">> => false,
|
||||
<<"mode">> => <<"memory">>,
|
||||
<<"per_partition_limit">> => <<"2GB">>,
|
||||
<<"segment_bytes">> => <<"100MB">>
|
||||
},
|
||||
<<"compression">> => <<"no_compression">>,
|
||||
<<"kafka_header_value_encode_mode">> => <<"none">>,
|
||||
<<"max_batch_bytes">> => <<"896KB">>,
|
||||
<<"max_inflight">> => 10,
|
||||
<<"message">> => #{
|
||||
<<"key">> => <<"${.clientid}">>,
|
||||
<<"timestamp">> => <<"${.timestamp}">>,
|
||||
<<"value">> => <<"${.payload}">>
|
||||
},
|
||||
<<"partition_count_refresh_interval">> => <<"60s">>,
|
||||
<<"partition_strategy">> => <<"random">>,
|
||||
<<"query_mode">> => <<"sync">>,
|
||||
<<"required_acks">> => <<"all_isr">>,
|
||||
<<"sync_query_timeout">> => <<"5s">>,
|
||||
<<"topic">> => emqx_bridge_kafka_impl_producer_SUITE:test_topic_one_partition()
|
||||
},
|
||||
<<"local_topic">> => <<"kafka_t/#">>,
|
||||
<<"resource_opts">> => #{
|
||||
<<"health_check_interval">> => <<"15s">>
|
||||
}
|
||||
}.
|
||||
|
||||
connector_config() ->
|
||||
#{
|
||||
<<"authentication">> => <<"none">>,
|
||||
<<"bootstrap_hosts">> => iolist_to_binary(kafka_hosts_string()),
|
||||
<<"connect_timeout">> => <<"5s">>,
|
||||
<<"enable">> => true,
|
||||
<<"metadata_request_timeout">> => <<"5s">>,
|
||||
<<"min_metadata_refresh_interval">> => <<"3s">>,
|
||||
<<"socket_opts">> =>
|
||||
#{
|
||||
<<"recbuf">> => <<"1024KB">>,
|
||||
<<"sndbuf">> => <<"1024KB">>,
|
||||
<<"tcp_keepalive">> => <<"none">>
|
||||
},
|
||||
<<"ssl">> =>
|
||||
#{
|
||||
<<"ciphers">> => [],
|
||||
<<"depth">> => 10,
|
||||
<<"enable">> => false,
|
||||
<<"hibernate_after">> => <<"5s">>,
|
||||
<<"log_level">> => <<"notice">>,
|
||||
<<"reuse_sessions">> => true,
|
||||
<<"secure_renegotiate">> => true,
|
||||
<<"verify">> => <<"verify_peer">>,
|
||||
<<"versions">> => [<<"tlsv1.3">>, <<"tlsv1.2">>]
|
||||
}
|
||||
}.
|
||||
|
||||
kafka_hosts_string() ->
|
||||
KafkaHost = os:getenv("KAFKA_PLAIN_HOST", "kafka-1.emqx.net"),
|
||||
KafkaPort = os:getenv("KAFKA_PLAIN_PORT", "9092"),
|
||||
KafkaHost ++ ":" ++ KafkaPort.
|
||||
|
|
Loading…
Reference in New Issue