emqx_bridge_kafka {
connect_timeout.desc:
"""Maximum wait time for TCP connection establishment (including authentication time if enabled)."""
connect_timeout.label:
"""Connect Timeout"""
producer_opts.desc:
"""Local MQTT data source and Kafka bridge configs."""
producer_opts.label:
"""MQTT to Kafka"""
min_metadata_refresh_interval.desc:
"""Minimum time interval the client has to wait before refreshing Kafka broker and topic metadata. Setting too small value may add extra load on Kafka."""
min_metadata_refresh_interval.label:
"""Min Metadata Refresh Interval"""
kafka_producer.desc:
"""Kafka Producer configuration."""
kafka_producer.label:
"""Kafka Producer"""
producer_buffer.desc:
"""Configure producer message buffer.
Tell Kafka producer how to buffer messages when EMQX has more messages to send than Kafka can keep up, or when Kafka is down."""
producer_buffer.label:
"""Message Buffer"""
socket_send_buffer.desc:
"""Fine tune the socket send buffer. The default value is tuned for high throughput."""
socket_send_buffer.label:
"""Socket Send Buffer Size"""
desc_name.desc:
"""Bridge name, used as a human-readable description of the bridge."""
desc_name.label:
"""Bridge Name"""
consumer_offset_commit_interval_seconds.desc:
"""Defines the time interval between two offset commit requests sent for each consumer group."""
consumer_offset_commit_interval_seconds.label:
"""Offset Commit Interval"""
consumer_max_batch_bytes.desc:
"""Set how many bytes to pull from Kafka in each fetch request. Please note that if the configured value is smaller than the message size in Kafka, it may negatively impact the fetch performance."""
consumer_max_batch_bytes.label:
"""Fetch Bytes"""
socket_receive_buffer.desc:
"""Fine tune the socket receive buffer. The default value is tuned for high throughput."""
socket_receive_buffer.label:
"""Socket Receive Buffer Size"""
consumer_topic_mapping.desc:
"""Defines the mapping between Kafka topics and MQTT topics. Must contain at least one item."""
consumer_topic_mapping.label:
"""Topic Mapping"""
producer_kafka_opts.desc:
"""Kafka producer configs."""
producer_kafka_opts.label:
"""Kafka Producer"""
kafka_topic.desc:
"""Kafka topic name"""
kafka_topic.label:
"""Kafka Topic Name"""
consumer_kafka_topic.desc:
"""Kafka topic to consume from."""
consumer_kafka_topic.label:
"""Kafka Topic"""
auth_username_password.desc:
"""Username/password based authentication."""
auth_username_password.label:
"""Username/password Auth"""
auth_sasl_password.desc:
"""SASL authentication password."""
auth_sasl_password.label:
"""Password"""
kafka_message_timestamp.desc:
"""Which timestamp to use. The timestamp is expected to be a millisecond precision Unix epoch which can be in string format, e.g. 1661326462115
or '1661326462115'
. When the desired data field for this template is not found, or if the found data is not a valid integer, the current system timestamp will be used."""
kafka_message_timestamp.label:
"""Message Timestamp"""
buffer_mode.desc:
"""Message buffer mode.
memory
: Buffer all messages in memory. The messages will be lost in case of EMQX node restart
disk
: Buffer all messages on disk. The messages on disk are able to survive EMQX node restart.
hybrid
: Buffer message in memory first, when up to certain limit (see segment_bytes
config for more information), then start offloading messages to disk, Like memory
mode, the messages will be lost in case of EMQX node restart."""
buffer_mode.label:
"""Buffer Mode"""
consumer_mqtt_qos.desc:
"""MQTT QoS used to publish messages consumed from Kafka."""
consumer_mqtt_qos.label:
"""QoS"""
consumer_key_encoding_mode.desc:
"""Defines how the key from the Kafka message is encoded before being forwarded via MQTT.
none
Uses the key from the Kafka message unchanged. Note: in this case, the key must be a valid UTF-8 string.
base64
Uses base-64 encoding on the received key."""
consumer_key_encoding_mode.label:
"""Key Encoding Mode"""
auth_gssapi_kerberos.desc:
"""Use GSSAPI/Kerberos authentication."""
auth_gssapi_kerberos.label:
"""GSSAPI/Kerberos"""
consumer_mqtt_opts.desc:
"""Local MQTT message publish."""
consumer_mqtt_opts.label:
"""MQTT publish"""
auth_kerberos_principal.desc:
"""SASL GSSAPI authentication Kerberos principal. For example client_name@MY.KERBEROS.REALM.MYDOMAIN.COM
, NOTE: The realm in use has to be configured in /etc/krb5.conf in EMQX nodes."""
auth_kerberos_principal.label:
"""Kerberos Principal"""
socket_opts.desc:
"""Extra socket options."""
socket_opts.label:
"""Socket Options"""
consumer_mqtt_topic.desc:
"""Local topic to which consumed Kafka messages should be published to."""
consumer_mqtt_topic.label:
"""MQTT Topic"""
consumer_offset_reset_policy.desc:
"""Defines from which offset a consumer should start fetching when there is no commit history or when the commit history becomes invalid."""
consumer_offset_reset_policy.label:
"""Offset Reset Policy"""
partition_count_refresh_interval.desc:
"""The time interval for Kafka producer to discover increased number of partitions.
After the number of partitions is increased in Kafka, EMQX will start taking the
discovered partitions into account when dispatching messages per partition_strategy
."""
partition_count_refresh_interval.label:
"""Partition Count Refresh Interval"""
max_batch_bytes.desc:
"""Maximum bytes to collect in a Kafka message batch. Most of the Kafka brokers default to a limit of 1 MB batch size. EMQX's default value is less than 1 MB in order to compensate Kafka message encoding overheads (especially when each individual message is very small). When a single message is over the limit, it is still sent (as a single element batch)."""
max_batch_bytes.label:
"""Max Batch Bytes"""
required_acks.desc:
"""Required acknowledgements for Kafka partition leader to wait for its followers before it sends back the acknowledgement to EMQX Kafka producer
all_isr
: Require all in-sync replicas to acknowledge.
leader_only
: Require only the partition-leader's acknowledgement.
none
: No need for Kafka to acknowledge at all."""
required_acks.label:
"""Required Acks"""
metadata_request_timeout.desc:
"""Maximum wait time when fetching metadata from Kafka."""
metadata_request_timeout.label:
"""Metadata Request Timeout"""
desc_type.desc:
"""The Bridge Type"""
desc_type.label:
"""Bridge Type"""
socket_nodelay.desc:
"""When set to 'true', TCP buffer is sent as soon as possible. Otherwise, the OS kernel may buffer small TCP packets for a while (40 ms by default)."""
socket_nodelay.label:
"""No Delay"""
authentication.desc:
"""Authentication configs."""
authentication.label:
"""Authentication"""
buffer_memory_overload_protection.desc:
"""Applicable when buffer mode is set to memory
EMQX will drop old buffered messages under high memory pressure. The high memory threshold is defined in config sysmon.os.sysmem_high_watermark
. NOTE: This config only works on Linux."""
buffer_memory_overload_protection.label:
"""Memory Overload Protection"""
auth_sasl_mechanism.desc:
"""SASL authentication mechanism."""
auth_sasl_mechanism.label:
"""Mechanism"""
config_enable.desc:
"""Enable (true) or disable (false) this Kafka bridge."""
config_enable.label:
"""Enable or Disable"""
consumer_mqtt_payload.desc:
"""The template for transforming the incoming Kafka message. By default, it will use JSON format to serialize inputs from the Kafka message. Such fields are:
headers
: an object containing string key-value pairs.
key
: Kafka message key (uses the chosen key encoding).
offset
: offset for the message.
topic
: Kafka topic.
ts
: message timestamp.
ts_type
: message timestamp type, which is one of create
, append
or undefined
.
value
: Kafka message value (uses the chosen value encoding)."""
consumer_mqtt_payload.label:
"""MQTT Payload Template"""
consumer_opts.desc:
"""Local MQTT publish and Kafka consumer configs."""
consumer_opts.label:
"""MQTT to Kafka"""
kafka_consumer.desc:
"""Kafka Consumer configuration."""
kafka_consumer.label:
"""Kafka Consumer"""
desc_config.desc:
"""Configuration for a Kafka bridge."""
desc_config.label:
"""Kafka Bridge Configuration"""
consumer_value_encoding_mode.desc:
"""Defines how the value from the Kafka message is encoded before being forwarded via MQTT.
none
Uses the value from the Kafka message unchanged. Note: in this case, the value must be a valid UTF-8 string.
base64
Uses base-64 encoding on the received value."""
consumer_value_encoding_mode.label:
"""Value Encoding Mode"""
buffer_per_partition_limit.desc:
"""Number of bytes allowed to buffer for each Kafka partition. When this limit is exceeded, old messages will be dropped in a trade for credits for new messages to be buffered."""
buffer_per_partition_limit.label:
"""Per-partition Buffer Limit"""
bootstrap_hosts.desc:
"""A comma separated list of Kafka host[:port]
endpoints to bootstrap the client. Default port number is 9092."""
bootstrap_hosts.label:
"""Bootstrap Hosts"""
consumer_max_rejoin_attempts.desc:
"""Maximum number of times allowed for a member to re-join the group. If the consumer group can not reach balance after this configured number of attempts, the consumer group member will restart after a delay."""
consumer_max_rejoin_attempts.label:
"""Max Rejoin Attempts"""
kafka_message_key.desc:
"""Template to render Kafka message key. If the template is rendered into a NULL value (i.e. there is no such data field in Rule Engine context) then Kafka's NULL
(but not empty string) is used."""
kafka_message_key.label:
"""Message Key"""
kafka_message.desc:
"""Template to render a Kafka message."""
kafka_message.label:
"""Kafka Message Template"""
mqtt_topic.desc:
"""MQTT topic or topic filter as data source (bridge input). If rule action is used as data source, this config should be left empty, otherwise messages will be duplicated in Kafka."""
mqtt_topic.label:
"""Source MQTT Topic"""
kafka_message_value.desc:
"""Template to render Kafka message value. If the template is rendered into a NULL value (i.e. there is no such data field in Rule Engine context) then Kafka's NULL
(but not empty string) is used."""
kafka_message_value.label:
"""Message Value"""
partition_strategy.desc:
"""Partition strategy is to tell the producer how to dispatch messages to Kafka partitions.
random
: Randomly pick a partition for each message
key_dispatch
: Hash Kafka message key to a partition number"""
partition_strategy.label:
"""Partition Strategy"""
buffer_segment_bytes.desc:
"""Applicable when buffer mode is set to disk
or hybrid
.
This value is to specify the size of each on-disk buffer file."""
buffer_segment_bytes.label:
"""Segment File Bytes"""
consumer_kafka_opts.desc:
"""Kafka consumer configs."""
consumer_kafka_opts.label:
"""Kafka Consumer"""
max_inflight.desc:
"""Maximum number of batches allowed for Kafka producer (per-partition) to send before receiving acknowledgement from Kafka. Greater value typically means better throughput. However, there can be a risk of message reordering when this value is greater than 1."""
max_inflight.label:
"""Max Inflight"""
auth_sasl_username.desc:
"""SASL authentication username."""
auth_sasl_username.label:
"""Username"""
auth_kerberos_keytab_file.desc:
"""SASL GSSAPI authentication Kerberos keytab file path. NOTE: This file has to be placed in EMQX nodes, and the EMQX service runner user requires read permission."""
auth_kerberos_keytab_file.label:
"""Kerberos keytab file"""
compression.desc:
"""Compression method."""
compression.label:
"""Compression"""
}