diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl index b37ef00e9..62745e924 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_impl_producer_SUITE.erl @@ -1018,112 +1018,89 @@ hocon_config(Args, ConfigTemplateFun) -> ), Hocon. -%% erlfmt-ignore hocon_config_template() -> -""" -bridges.kafka.{{ bridge_name }} { - bootstrap_hosts = \"{{ kafka_hosts_string }}\" - enable = true - authentication = {{{ authentication }}} - ssl = {{{ ssl }}} - local_topic = \"{{ local_topic }}\" - kafka = { - message = { - key = \"${clientid}\" - value = \"${.payload}\" - timestamp = \"${timestamp}\" - } - buffer = { - memory_overload_protection = false - } - partition_strategy = {{ partition_strategy }} - topic = \"{{ kafka_topic }}\" - query_mode = {{ query_mode }} - } - metadata_request_timeout = 5s - min_metadata_refresh_interval = 3s - socket_opts { - nodelay = true - } - connect_timeout = 5s -} -""". + "bridges.kafka.{{ bridge_name }} {" + "\n bootstrap_hosts = \"{{ kafka_hosts_string }}\"" + "\n enable = true" + "\n authentication = {{{ authentication }}}" + "\n ssl = {{{ ssl }}}" + "\n local_topic = \"{{ local_topic }}\"" + "\n kafka = {" + "\n message = {" + "\n key = \"${clientid}\"" + "\n value = \"${.payload}\"" + "\n timestamp = \"${timestamp}\"" + "\n }" + "\n buffer = {" + "\n memory_overload_protection = false" + "\n }" + "\n partition_strategy = {{ partition_strategy }}" + "\n topic = \"{{ kafka_topic }}\"" + "\n query_mode = {{ query_mode }}" + "\n }" + "\n metadata_request_timeout = 5s" + "\n min_metadata_refresh_interval = 3s" + "\n socket_opts {" + "\n nodelay = true" + "\n }" + "\n connect_timeout = 5s" + "\n }". -%% erlfmt-ignore hocon_config_template_with_headers() -> -""" -bridges.kafka.{{ bridge_name }} { - bootstrap_hosts = \"{{ kafka_hosts_string }}\" - enable = true - authentication = {{{ authentication }}} - ssl = {{{ ssl }}} - local_topic = \"{{ local_topic }}\" - kafka = { - message = { - key = \"${clientid}\" - value = \"${.payload}\" - timestamp = \"${timestamp}\" - } - buffer = { - memory_overload_protection = false - } - kafka_headers = \"{{ kafka_headers }}\" - kafka_header_value_encode_mode: json - kafka_ext_headers: {{{ kafka_ext_headers }}} - partition_strategy = {{ partition_strategy }} - topic = \"{{ kafka_topic }}\" - query_mode = {{ query_mode }} - } - metadata_request_timeout = 5s - min_metadata_refresh_interval = 3s - socket_opts { - nodelay = true - } - connect_timeout = 5s -} -""". + "bridges.kafka.{{ bridge_name }} {" + "\n bootstrap_hosts = \"{{ kafka_hosts_string }}\"" + "\n enable = true" + "\n authentication = {{{ authentication }}}" + "\n ssl = {{{ ssl }}}" + "\n local_topic = \"{{ local_topic }}\"" + "\n kafka = {" + "\n message = {" + "\n key = \"${clientid}\"" + "\n value = \"${.payload}\"" + "\n timestamp = \"${timestamp}\"" + "\n }" + "\n buffer = {" + "\n memory_overload_protection = false" + "\n }" + "\n kafka_headers = \"{{ kafka_headers }}\"" + "\n kafka_header_value_encode_mode: json" + "\n kafka_ext_headers: {{{ kafka_ext_headers }}}" + "\n partition_strategy = {{ partition_strategy }}" + "\n topic = \"{{ kafka_topic }}\"" + "\n query_mode = {{ query_mode }}" + "\n }" + "\n metadata_request_timeout = 5s" + "\n min_metadata_refresh_interval = 3s" + "\n socket_opts {" + "\n nodelay = true" + "\n }" + "\n connect_timeout = 5s" + "\n }". -%% erlfmt-ignore hocon_config_template_authentication("none") -> "none"; hocon_config_template_authentication(#{"mechanism" := _}) -> -""" -{ - mechanism = {{ mechanism }} - password = {{ password }} - username = {{ username }} -} -"""; + "{" + "\n mechanism = {{ mechanism }}" + "\n password = {{ password }}" + "\n username = {{ username }}" + "\n }"; hocon_config_template_authentication(#{"kerberos_principal" := _}) -> -""" -{ - kerberos_principal = \"{{ kerberos_principal }}\" - kerberos_keytab_file = \"{{ kerberos_keytab_file }}\" -} -""". + "{" + "\n kerberos_principal = \"{{ kerberos_principal }}\"" + "\n kerberos_keytab_file = \"{{ kerberos_keytab_file }}\"" + "\n }". -%% erlfmt-ignore hocon_config_template_ssl(Map) when map_size(Map) =:= 0 -> -""" -{ - enable = false -} -"""; + "{ enable = false }"; hocon_config_template_ssl(#{"enable" := "false"}) -> -""" -{ - enable = false -} -"""; + "{ enable = false }"; hocon_config_template_ssl(#{"enable" := "true"}) -> -""" -{ - enable = true - cacertfile = \"{{{cacertfile}}}\" - certfile = \"{{{certfile}}}\" - keyfile = \"{{{keyfile}}}\" -} -""". + "{ enable = true" + "\n cacertfile = \"{{{cacertfile}}}\"" + "\n certfile = \"{{{certfile}}}\"" + "\n keyfile = \"{{{keyfile}}}\"" + "\n }". kafka_hosts_string(tcp, none) -> kafka_hosts_string(); diff --git a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl index 1d9682b9b..ff4334a85 100644 --- a/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl +++ b/apps/emqx_bridge_kafka/test/emqx_bridge_kafka_tests.erl @@ -223,144 +223,136 @@ check_atom_key(Conf) when is_map(Conf) -> %% Data section %%=========================================================================== -%% erlfmt-ignore kafka_producer_old_hocon(_WithLocalTopic = true) -> kafka_producer_old_hocon("mqtt {topic = \"mqtt/local\"}\n"); kafka_producer_old_hocon(_WithLocalTopic = false) -> kafka_producer_old_hocon("mqtt {}\n"); kafka_producer_old_hocon(MQTTConfig) when is_list(MQTTConfig) -> -""" -bridges.kafka { - myproducer { - authentication = \"none\" - bootstrap_hosts = \"toxiproxy:9292\" - connect_timeout = \"5s\" - metadata_request_timeout = \"5s\" - min_metadata_refresh_interval = \"3s\" - producer { - kafka { - buffer { - memory_overload_protection = false - mode = \"memory\" - per_partition_limit = \"2GB\" - segment_bytes = \"100MB\" - } - compression = \"no_compression\" - max_batch_bytes = \"896KB\" - max_inflight = 10 - message { - key = \"${.clientid}\" - timestamp = \"${.timestamp}\" - value = \"${.}\" - } - partition_count_refresh_interval = \"60s\" - partition_strategy = \"random\" - required_acks = \"all_isr\" - topic = \"test-topic-two-partitions\" - } -""" ++ MQTTConfig ++ -""" - } - socket_opts { - nodelay = true - recbuf = \"1024KB\" - sndbuf = \"1024KB\" - } - ssl {enable = false, verify = \"verify_peer\"} - } -} -""". + [ + "bridges.kafka {" + "\n myproducer {" + "\n authentication = \"none\"" + "\n bootstrap_hosts = \"toxiproxy:9292\"" + "\n connect_timeout = \"5s\"" + "\n metadata_request_timeout = \"5s\"" + "\n min_metadata_refresh_interval = \"3s\"" + "\n producer {" + "\n kafka {" + "\n buffer {" + "\n memory_overload_protection = false" + "\n mode = \"memory\"" + "\n per_partition_limit = \"2GB\"" + "\n segment_bytes = \"100MB\"" + "\n }" + "\n compression = \"no_compression\"" + "\n max_batch_bytes = \"896KB\"" + "\n max_inflight = 10" + "\n message {" + "\n key = \"${.clientid}\"" + "\n timestamp = \"${.timestamp}\"" + "\n value = \"${.}\"" + "\n }" + "\n partition_count_refresh_interval = \"60s\"" + "\n partition_strategy = \"random\"" + "\n required_acks = \"all_isr\"" + "\n topic = \"test-topic-two-partitions\"" + "\n }", + MQTTConfig, + "\n }" + "\n socket_opts {" + "\n nodelay = true" + "\n recbuf = \"1024KB\"" + "\n sndbuf = \"1024KB\"" + "\n }" + "\n ssl {enable = false, verify = \"verify_peer\"}" + "\n }" + "\n}" + ]. kafka_producer_new_hocon() -> - "" - "\n" - "bridges.kafka {\n" - " myproducer {\n" - " authentication = \"none\"\n" - " bootstrap_hosts = \"toxiproxy:9292\"\n" - " connect_timeout = \"5s\"\n" - " metadata_request_timeout = \"5s\"\n" - " min_metadata_refresh_interval = \"3s\"\n" - " kafka {\n" - " buffer {\n" - " memory_overload_protection = false\n" - " mode = \"memory\"\n" - " per_partition_limit = \"2GB\"\n" - " segment_bytes = \"100MB\"\n" - " }\n" - " compression = \"no_compression\"\n" - " max_batch_bytes = \"896KB\"\n" - " max_inflight = 10\n" - " message {\n" - " key = \"${.clientid}\"\n" - " timestamp = \"${.timestamp}\"\n" - " value = \"${.}\"\n" - " }\n" - " partition_count_refresh_interval = \"60s\"\n" - " partition_strategy = \"random\"\n" - " required_acks = \"all_isr\"\n" - " topic = \"test-topic-two-partitions\"\n" - " }\n" - " local_topic = \"mqtt/local\"\n" - " socket_opts {\n" - " nodelay = true\n" - " recbuf = \"1024KB\"\n" - " sndbuf = \"1024KB\"\n" - " }\n" - " ssl {enable = false, verify = \"verify_peer\"}\n" - " resource_opts {\n" - " health_check_interval = 10s\n" - " }\n" - " }\n" - "}\n" - "". + "bridges.kafka {" + "\n myproducer {" + "\n authentication = \"none\"" + "\n bootstrap_hosts = \"toxiproxy:9292\"" + "\n connect_timeout = \"5s\"" + "\n metadata_request_timeout = \"5s\"" + "\n min_metadata_refresh_interval = \"3s\"" + "\n kafka {" + "\n buffer {" + "\n memory_overload_protection = false" + "\n mode = \"memory\"" + "\n per_partition_limit = \"2GB\"" + "\n segment_bytes = \"100MB\"" + "\n }" + "\n compression = \"no_compression\"" + "\n max_batch_bytes = \"896KB\"" + "\n max_inflight = 10" + "\n message {" + "\n key = \"${.clientid}\"" + "\n timestamp = \"${.timestamp}\"" + "\n value = \"${.}\"" + "\n }" + "\n partition_count_refresh_interval = \"60s\"" + "\n partition_strategy = \"random\"" + "\n required_acks = \"all_isr\"" + "\n topic = \"test-topic-two-partitions\"" + "\n }" + "\n local_topic = \"mqtt/local\"" + "\n socket_opts {" + "\n nodelay = true" + "\n recbuf = \"1024KB\"" + "\n sndbuf = \"1024KB\"" + "\n }" + "\n ssl {enable = false, verify = \"verify_peer\"}" + "\n resource_opts {" + "\n health_check_interval = 10s" + "\n }" + "\n }" + "\n}". -%% erlfmt-ignore kafka_consumer_hocon() -> -""" -bridges.kafka_consumer.my_consumer { - enable = true - bootstrap_hosts = \"kafka-1.emqx.net:9292\" - connect_timeout = 5s - min_metadata_refresh_interval = 3s - metadata_request_timeout = 5s - authentication = { - mechanism = plain - username = emqxuser - password = password - } - kafka { - max_batch_bytes = 896KB - max_rejoin_attempts = 5 - offset_commit_interval_seconds = 3s - offset_reset_policy = latest - } - topic_mapping = [ - { - kafka_topic = \"kafka-topic-1\" - mqtt_topic = \"mqtt/topic/1\" - qos = 1 - payload_template = \"${.}\" - }, - { - kafka_topic = \"kafka-topic-2\" - mqtt_topic = \"mqtt/topic/2\" - qos = 2 - payload_template = \"v = ${.value}\" - } - ] - key_encoding_mode = none - value_encoding_mode = none - ssl { - enable = false - verify = verify_none - server_name_indication = \"auto\" - } - resource_opts { - health_check_interval = 10s - } -} -""". + "bridges.kafka_consumer.my_consumer {" + "\n enable = true" + "\n bootstrap_hosts = \"kafka-1.emqx.net:9292\"" + "\n connect_timeout = 5s" + "\n min_metadata_refresh_interval = 3s" + "\n metadata_request_timeout = 5s" + "\n authentication = {" + "\n mechanism = plain" + "\n username = emqxuser" + "\n password = password" + "\n }" + "\n kafka {" + "\n max_batch_bytes = 896KB" + "\n max_rejoin_attempts = 5" + "\n offset_commit_interval_seconds = 3s" + "\n offset_reset_policy = latest" + "\n }" + "\n topic_mapping = [" + "\n {" + "\n kafka_topic = \"kafka-topic-1\"" + "\n mqtt_topic = \"mqtt/topic/1\"" + "\n qos = 1" + "\n payload_template = \"${.}\"" + "\n }," + "\n {" + "\n kafka_topic = \"kafka-topic-2\"" + "\n mqtt_topic = \"mqtt/topic/2\"" + "\n qos = 2" + "\n payload_template = \"v = ${.value}\"" + "\n }" + "\n ]" + "\n key_encoding_mode = none" + "\n value_encoding_mode = none" + "\n ssl {" + "\n enable = false" + "\n verify = verify_none" + "\n server_name_indication = \"auto\"" + "\n }" + "\n resource_opts {" + "\n health_check_interval = 10s" + "\n }" + "\n }". %% assert compatibility bridge_schema_json_test() ->