chore(test): fix formatting quirks

This commit is contained in:
Andrew Mayorov 2023-11-07 23:14:33 +07:00
parent c300eb41a7
commit 18cd98def6
No known key found for this signature in database
GPG Key ID: 2837C62ACFBFED5D
2 changed files with 192 additions and 223 deletions

View File

@ -1018,112 +1018,89 @@ hocon_config(Args, ConfigTemplateFun) ->
),
Hocon.
%% erlfmt-ignore
hocon_config_template() ->
"""
bridges.kafka.{{ bridge_name }} {
bootstrap_hosts = \"{{ kafka_hosts_string }}\"
enable = true
authentication = {{{ authentication }}}
ssl = {{{ ssl }}}
local_topic = \"{{ local_topic }}\"
kafka = {
message = {
key = \"${clientid}\"
value = \"${.payload}\"
timestamp = \"${timestamp}\"
}
buffer = {
memory_overload_protection = false
}
partition_strategy = {{ partition_strategy }}
topic = \"{{ kafka_topic }}\"
query_mode = {{ query_mode }}
}
metadata_request_timeout = 5s
min_metadata_refresh_interval = 3s
socket_opts {
nodelay = true
}
connect_timeout = 5s
}
""".
"bridges.kafka.{{ bridge_name }} {"
"\n bootstrap_hosts = \"{{ kafka_hosts_string }}\""
"\n enable = true"
"\n authentication = {{{ authentication }}}"
"\n ssl = {{{ ssl }}}"
"\n local_topic = \"{{ local_topic }}\""
"\n kafka = {"
"\n message = {"
"\n key = \"${clientid}\""
"\n value = \"${.payload}\""
"\n timestamp = \"${timestamp}\""
"\n }"
"\n buffer = {"
"\n memory_overload_protection = false"
"\n }"
"\n partition_strategy = {{ partition_strategy }}"
"\n topic = \"{{ kafka_topic }}\""
"\n query_mode = {{ query_mode }}"
"\n }"
"\n metadata_request_timeout = 5s"
"\n min_metadata_refresh_interval = 3s"
"\n socket_opts {"
"\n nodelay = true"
"\n }"
"\n connect_timeout = 5s"
"\n }".
%% erlfmt-ignore
hocon_config_template_with_headers() ->
"""
bridges.kafka.{{ bridge_name }} {
bootstrap_hosts = \"{{ kafka_hosts_string }}\"
enable = true
authentication = {{{ authentication }}}
ssl = {{{ ssl }}}
local_topic = \"{{ local_topic }}\"
kafka = {
message = {
key = \"${clientid}\"
value = \"${.payload}\"
timestamp = \"${timestamp}\"
}
buffer = {
memory_overload_protection = false
}
kafka_headers = \"{{ kafka_headers }}\"
kafka_header_value_encode_mode: json
kafka_ext_headers: {{{ kafka_ext_headers }}}
partition_strategy = {{ partition_strategy }}
topic = \"{{ kafka_topic }}\"
query_mode = {{ query_mode }}
}
metadata_request_timeout = 5s
min_metadata_refresh_interval = 3s
socket_opts {
nodelay = true
}
connect_timeout = 5s
}
""".
"bridges.kafka.{{ bridge_name }} {"
"\n bootstrap_hosts = \"{{ kafka_hosts_string }}\""
"\n enable = true"
"\n authentication = {{{ authentication }}}"
"\n ssl = {{{ ssl }}}"
"\n local_topic = \"{{ local_topic }}\""
"\n kafka = {"
"\n message = {"
"\n key = \"${clientid}\""
"\n value = \"${.payload}\""
"\n timestamp = \"${timestamp}\""
"\n }"
"\n buffer = {"
"\n memory_overload_protection = false"
"\n }"
"\n kafka_headers = \"{{ kafka_headers }}\""
"\n kafka_header_value_encode_mode: json"
"\n kafka_ext_headers: {{{ kafka_ext_headers }}}"
"\n partition_strategy = {{ partition_strategy }}"
"\n topic = \"{{ kafka_topic }}\""
"\n query_mode = {{ query_mode }}"
"\n }"
"\n metadata_request_timeout = 5s"
"\n min_metadata_refresh_interval = 3s"
"\n socket_opts {"
"\n nodelay = true"
"\n }"
"\n connect_timeout = 5s"
"\n }".
%% erlfmt-ignore
hocon_config_template_authentication("none") ->
"none";
hocon_config_template_authentication(#{"mechanism" := _}) ->
"""
{
mechanism = {{ mechanism }}
password = {{ password }}
username = {{ username }}
}
""";
"{"
"\n mechanism = {{ mechanism }}"
"\n password = {{ password }}"
"\n username = {{ username }}"
"\n }";
hocon_config_template_authentication(#{"kerberos_principal" := _}) ->
"""
{
kerberos_principal = \"{{ kerberos_principal }}\"
kerberos_keytab_file = \"{{ kerberos_keytab_file }}\"
}
""".
"{"
"\n kerberos_principal = \"{{ kerberos_principal }}\""
"\n kerberos_keytab_file = \"{{ kerberos_keytab_file }}\""
"\n }".
%% erlfmt-ignore
hocon_config_template_ssl(Map) when map_size(Map) =:= 0 ->
"""
{
enable = false
}
""";
"{ enable = false }";
hocon_config_template_ssl(#{"enable" := "false"}) ->
"""
{
enable = false
}
""";
"{ enable = false }";
hocon_config_template_ssl(#{"enable" := "true"}) ->
"""
{
enable = true
cacertfile = \"{{{cacertfile}}}\"
certfile = \"{{{certfile}}}\"
keyfile = \"{{{keyfile}}}\"
}
""".
"{ enable = true"
"\n cacertfile = \"{{{cacertfile}}}\""
"\n certfile = \"{{{certfile}}}\""
"\n keyfile = \"{{{keyfile}}}\""
"\n }".
kafka_hosts_string(tcp, none) ->
kafka_hosts_string();

View File

@ -223,144 +223,136 @@ check_atom_key(Conf) when is_map(Conf) ->
%% Data section
%%===========================================================================
%% erlfmt-ignore
kafka_producer_old_hocon(_WithLocalTopic = true) ->
kafka_producer_old_hocon("mqtt {topic = \"mqtt/local\"}\n");
kafka_producer_old_hocon(_WithLocalTopic = false) ->
kafka_producer_old_hocon("mqtt {}\n");
kafka_producer_old_hocon(MQTTConfig) when is_list(MQTTConfig) ->
"""
bridges.kafka {
myproducer {
authentication = \"none\"
bootstrap_hosts = \"toxiproxy:9292\"
connect_timeout = \"5s\"
metadata_request_timeout = \"5s\"
min_metadata_refresh_interval = \"3s\"
producer {
kafka {
buffer {
memory_overload_protection = false
mode = \"memory\"
per_partition_limit = \"2GB\"
segment_bytes = \"100MB\"
}
compression = \"no_compression\"
max_batch_bytes = \"896KB\"
max_inflight = 10
message {
key = \"${.clientid}\"
timestamp = \"${.timestamp}\"
value = \"${.}\"
}
partition_count_refresh_interval = \"60s\"
partition_strategy = \"random\"
required_acks = \"all_isr\"
topic = \"test-topic-two-partitions\"
}
""" ++ MQTTConfig ++
"""
}
socket_opts {
nodelay = true
recbuf = \"1024KB\"
sndbuf = \"1024KB\"
}
ssl {enable = false, verify = \"verify_peer\"}
}
}
""".
[
"bridges.kafka {"
"\n myproducer {"
"\n authentication = \"none\""
"\n bootstrap_hosts = \"toxiproxy:9292\""
"\n connect_timeout = \"5s\""
"\n metadata_request_timeout = \"5s\""
"\n min_metadata_refresh_interval = \"3s\""
"\n producer {"
"\n kafka {"
"\n buffer {"
"\n memory_overload_protection = false"
"\n mode = \"memory\""
"\n per_partition_limit = \"2GB\""
"\n segment_bytes = \"100MB\""
"\n }"
"\n compression = \"no_compression\""
"\n max_batch_bytes = \"896KB\""
"\n max_inflight = 10"
"\n message {"
"\n key = \"${.clientid}\""
"\n timestamp = \"${.timestamp}\""
"\n value = \"${.}\""
"\n }"
"\n partition_count_refresh_interval = \"60s\""
"\n partition_strategy = \"random\""
"\n required_acks = \"all_isr\""
"\n topic = \"test-topic-two-partitions\""
"\n }",
MQTTConfig,
"\n }"
"\n socket_opts {"
"\n nodelay = true"
"\n recbuf = \"1024KB\""
"\n sndbuf = \"1024KB\""
"\n }"
"\n ssl {enable = false, verify = \"verify_peer\"}"
"\n }"
"\n}"
].
kafka_producer_new_hocon() ->
""
"\n"
"bridges.kafka {\n"
" myproducer {\n"
" authentication = \"none\"\n"
" bootstrap_hosts = \"toxiproxy:9292\"\n"
" connect_timeout = \"5s\"\n"
" metadata_request_timeout = \"5s\"\n"
" min_metadata_refresh_interval = \"3s\"\n"
" kafka {\n"
" buffer {\n"
" memory_overload_protection = false\n"
" mode = \"memory\"\n"
" per_partition_limit = \"2GB\"\n"
" segment_bytes = \"100MB\"\n"
" }\n"
" compression = \"no_compression\"\n"
" max_batch_bytes = \"896KB\"\n"
" max_inflight = 10\n"
" message {\n"
" key = \"${.clientid}\"\n"
" timestamp = \"${.timestamp}\"\n"
" value = \"${.}\"\n"
" }\n"
" partition_count_refresh_interval = \"60s\"\n"
" partition_strategy = \"random\"\n"
" required_acks = \"all_isr\"\n"
" topic = \"test-topic-two-partitions\"\n"
" }\n"
" local_topic = \"mqtt/local\"\n"
" socket_opts {\n"
" nodelay = true\n"
" recbuf = \"1024KB\"\n"
" sndbuf = \"1024KB\"\n"
" }\n"
" ssl {enable = false, verify = \"verify_peer\"}\n"
" resource_opts {\n"
" health_check_interval = 10s\n"
" }\n"
" }\n"
"}\n"
"".
"bridges.kafka {"
"\n myproducer {"
"\n authentication = \"none\""
"\n bootstrap_hosts = \"toxiproxy:9292\""
"\n connect_timeout = \"5s\""
"\n metadata_request_timeout = \"5s\""
"\n min_metadata_refresh_interval = \"3s\""
"\n kafka {"
"\n buffer {"
"\n memory_overload_protection = false"
"\n mode = \"memory\""
"\n per_partition_limit = \"2GB\""
"\n segment_bytes = \"100MB\""
"\n }"
"\n compression = \"no_compression\""
"\n max_batch_bytes = \"896KB\""
"\n max_inflight = 10"
"\n message {"
"\n key = \"${.clientid}\""
"\n timestamp = \"${.timestamp}\""
"\n value = \"${.}\""
"\n }"
"\n partition_count_refresh_interval = \"60s\""
"\n partition_strategy = \"random\""
"\n required_acks = \"all_isr\""
"\n topic = \"test-topic-two-partitions\""
"\n }"
"\n local_topic = \"mqtt/local\""
"\n socket_opts {"
"\n nodelay = true"
"\n recbuf = \"1024KB\""
"\n sndbuf = \"1024KB\""
"\n }"
"\n ssl {enable = false, verify = \"verify_peer\"}"
"\n resource_opts {"
"\n health_check_interval = 10s"
"\n }"
"\n }"
"\n}".
%% erlfmt-ignore
kafka_consumer_hocon() ->
"""
bridges.kafka_consumer.my_consumer {
enable = true
bootstrap_hosts = \"kafka-1.emqx.net:9292\"
connect_timeout = 5s
min_metadata_refresh_interval = 3s
metadata_request_timeout = 5s
authentication = {
mechanism = plain
username = emqxuser
password = password
}
kafka {
max_batch_bytes = 896KB
max_rejoin_attempts = 5
offset_commit_interval_seconds = 3s
offset_reset_policy = latest
}
topic_mapping = [
{
kafka_topic = \"kafka-topic-1\"
mqtt_topic = \"mqtt/topic/1\"
qos = 1
payload_template = \"${.}\"
},
{
kafka_topic = \"kafka-topic-2\"
mqtt_topic = \"mqtt/topic/2\"
qos = 2
payload_template = \"v = ${.value}\"
}
]
key_encoding_mode = none
value_encoding_mode = none
ssl {
enable = false
verify = verify_none
server_name_indication = \"auto\"
}
resource_opts {
health_check_interval = 10s
}
}
""".
"bridges.kafka_consumer.my_consumer {"
"\n enable = true"
"\n bootstrap_hosts = \"kafka-1.emqx.net:9292\""
"\n connect_timeout = 5s"
"\n min_metadata_refresh_interval = 3s"
"\n metadata_request_timeout = 5s"
"\n authentication = {"
"\n mechanism = plain"
"\n username = emqxuser"
"\n password = password"
"\n }"
"\n kafka {"
"\n max_batch_bytes = 896KB"
"\n max_rejoin_attempts = 5"
"\n offset_commit_interval_seconds = 3s"
"\n offset_reset_policy = latest"
"\n }"
"\n topic_mapping = ["
"\n {"
"\n kafka_topic = \"kafka-topic-1\""
"\n mqtt_topic = \"mqtt/topic/1\""
"\n qos = 1"
"\n payload_template = \"${.}\""
"\n },"
"\n {"
"\n kafka_topic = \"kafka-topic-2\""
"\n mqtt_topic = \"mqtt/topic/2\""
"\n qos = 2"
"\n payload_template = \"v = ${.value}\""
"\n }"
"\n ]"
"\n key_encoding_mode = none"
"\n value_encoding_mode = none"
"\n ssl {"
"\n enable = false"
"\n verify = verify_none"
"\n server_name_indication = \"auto\""
"\n }"
"\n resource_opts {"
"\n health_check_interval = 10s"
"\n }"
"\n }".
%% assert compatibility
bridge_schema_json_test() ->