From 37a35b22df60142726c1a6f08add3e5e01b46c92 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Mon, 13 Mar 2023 14:02:43 -0300 Subject: [PATCH] docs(kafka): improve kafka consumer/producer API examples --- .../src/emqx_ee_bridge_kafka.erl | 80 +++++++++++++++++-- 1 file changed, 72 insertions(+), 8 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index de67a73c6..943009945 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -43,25 +43,89 @@ conn_bridge_examples(Method) -> %% for backwards compatibility. <<"kafka">> => #{ summary => <<"Kafka Producer Bridge">>, - value => values(Method) + value => values({Method, producer}) } }, #{ <<"kafka_consumer">> => #{ summary => <<"Kafka Consumer Bridge">>, - value => values(Method) + value => values({Method, consumer}) } } ]. -values(get) -> - maps:merge(values(post), ?METRICS_EXAMPLE); -values(post) -> +values({get, KafkaType}) -> + maps:merge(values({post, KafkaType}), ?METRICS_EXAMPLE); +values({post, KafkaType}) -> + maps:merge(values(common_config), values(KafkaType)); +values({put, KafkaType}) -> + values({post, KafkaType}); +values(common_config) -> #{ - bootstrap_hosts => <<"localhost:9092">> + authentication => #{ + mechanism => <<"plain">>, + username => <<"username">>, + password => <<"password">> + }, + bootstrap_hosts => <<"localhost:9092">>, + connect_timeout => <<"5s">>, + enable => true, + metadata_request_timeout => <<"4s">>, + min_metadata_refresh_interval => <<"3s">>, + socket_opts => #{ + sndbuf => <<"1024KB">>, + recbuf => <<"1024KB">>, + nodelay => true + } }; -values(put) -> - values(post). +values(producer) -> + #{ + kafka => #{ + topic => <<"kafka-topic">>, + message => #{ + key => <<"${.clientid}">>, + value => <<"${.}">>, + timestamp => <<"${.timestamp}">> + }, + max_batch_bytes => <<"896KB">>, + compression => <<"no_compression">>, + partition_strategy => <<"random">>, + required_acks => <<"all_isr">>, + partition_count_refresh_interval => <<"60s">>, + max_inflight => 10, + buffer => #{ + mode => <<"hybrid">>, + per_partition_limit => <<"2GB">>, + segment_bytes => <<"100MB">>, + memory_overload_protection => true + } + }, + local_topic => <<"mqtt/local/topic">> + }; +values(consumer) -> + #{ + kafka => #{ + max_batch_bytes => <<"896KB">>, + offset_reset_policy => <<"reset_to_latest">>, + offset_commit_interval_seconds => 5 + }, + key_encoding_mode => <<"force_utf8">>, + topic_mapping => [ + #{ + kafka_topic => <<"kafka-topic-1">>, + mqtt_topic => <<"mqtt/topic/1">>, + qos => 1, + payload_template => <<"${.}">> + }, + #{ + kafka_topic => <<"kafka-topic-2">>, + mqtt_topic => <<"mqtt/topic/2">>, + qos => 2, + payload_template => <<"v = ${.value}">> + } + ], + value_encoding_mode => <<"force_utf8">> + }. %% ------------------------------------------------------------------------------------------------- %% Hocon Schema Definitions