From d2901afd1b4f1734dfdc9479c7b3a6f682e66ad9 Mon Sep 17 00:00:00 2001 From: Stefan Strigler Date: Thu, 16 Nov 2023 09:41:56 +0100 Subject: [PATCH] fix(emqx_bridge_kafka): match example in api schema --- .../src/emqx_bridge_kafka.erl | 85 ++++++++++--------- 1 file changed, 43 insertions(+), 42 deletions(-) diff --git a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl index 5b3e3ca01..93515b5db 100644 --- a/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl +++ b/apps/emqx_bridge_kafka/src/emqx_bridge_kafka.erl @@ -112,16 +112,15 @@ values({put, connector}) -> values({put, KafkaType}) -> maps:merge(values(common_config), values(KafkaType)); values(bridge_v2_producer) -> - maps:merge( - #{ - enable => true, - connector => <<"my_kafka_producer_connector">>, - resource_opts => #{ - health_check_interval => "32s" - } - }, - values(producer) - ); + #{ + enable => true, + connector => <<"my_kafka_producer_connector">>, + parameters => values(producer_values), + local_topic => <<"mqtt/local/topic">>, + resource_opts => #{ + health_check_interval => "32s" + } + }; values(common_config) -> #{ authentication => #{ @@ -143,40 +142,42 @@ values(common_config) -> }; values(producer) -> #{ - kafka => #{ - topic => <<"kafka-topic">>, - message => #{ - key => <<"${.clientid}">>, - value => <<"${.}">>, - timestamp => <<"${.timestamp}">> - }, - max_batch_bytes => <<"896KB">>, - compression => <<"no_compression">>, - partition_strategy => <<"random">>, - required_acks => <<"all_isr">>, - partition_count_refresh_interval => <<"60s">>, - kafka_headers => <<"${pub_props}">>, - kafka_ext_headers => [ - #{ - kafka_ext_header_key => <<"clientid">>, - kafka_ext_header_value => <<"${clientid}">> - }, - #{ - kafka_ext_header_key => <<"topic">>, - kafka_ext_header_value => <<"${topic}">> - } - ], - kafka_header_value_encode_mode => none, - max_inflight => 10, - buffer => #{ - mode => <<"hybrid">>, - per_partition_limit => <<"2GB">>, - segment_bytes => <<"100MB">>, - memory_overload_protection => true - } - }, + kafka => values(producer_values), local_topic => <<"mqtt/local/topic">> }; +values(producer_values) -> + #{ + topic => <<"kafka-topic">>, + message => #{ + key => <<"${.clientid}">>, + value => <<"${.}">>, + timestamp => <<"${.timestamp}">> + }, + max_batch_bytes => <<"896KB">>, + compression => <<"no_compression">>, + partition_strategy => <<"random">>, + required_acks => <<"all_isr">>, + partition_count_refresh_interval => <<"60s">>, + kafka_headers => <<"${pub_props}">>, + kafka_ext_headers => [ + #{ + kafka_ext_header_key => <<"clientid">>, + kafka_ext_header_value => <<"${clientid}">> + }, + #{ + kafka_ext_header_key => <<"topic">>, + kafka_ext_header_value => <<"${topic}">> + } + ], + kafka_header_value_encode_mode => none, + max_inflight => 10, + buffer => #{ + mode => <<"hybrid">>, + per_partition_limit => <<"2GB">>, + segment_bytes => <<"100MB">>, + memory_overload_protection => true + } + }; values(consumer) -> #{ kafka => #{