emqx_bridge_kafka {
connect_timeout.desc:
"""建立 TCP 连接时的最大等待时长(若启用认证,这个等待时长也包含完成认证所需时间)。"""
connect_timeout.label:
"""连接超时"""
producer_opts.desc:
"""本地 MQTT 数据源和 Kafka 桥接的配置。"""
producer_opts.label:
"""MQTT 到 Kafka"""
min_metadata_refresh_interval.desc:
"""刷新 Kafka broker 和 Kafka 主题元数据段最短时间间隔。设置太小可能会增加 Kafka 压力。"""
min_metadata_refresh_interval.label:
"""元数据刷新最小间隔"""
kafka_producer.desc:
"""Kafka Producer 配置。"""
kafka_producer.label:
"""Kafka Producer"""
producer_buffer.desc:
"""配置消息缓存的相关参数。
当 EMQX 需要发送的消息超过 Kafka 处理能力,或者当 Kafka 临时下线时,EMQX 内部会将消息缓存起来。"""
producer_buffer.label:
"""消息缓存"""
socket_send_buffer.desc:
"""TCP socket 的发送缓存调优。默认值是针对高吞吐量的一个推荐值。"""
socket_send_buffer.label:
"""Socket 发送缓存大小"""
desc_name.desc:
"""桥接名字,可读描述"""
desc_name.label:
"""桥接名字"""
consumer_offset_commit_interval_seconds.desc:
"""指定 Kafka 消费组偏移量提交的时间间隔。"""
consumer_offset_commit_interval_seconds.label:
"""偏移提交间隔"""
consumer_max_batch_bytes.desc:
"""设置每次从 Kafka 拉取数据的字节数。如该配置小于 Kafka 消息的大小,可能会影响消费性能。"""
consumer_max_batch_bytes.label:
"""拉取字节数"""
socket_receive_buffer.desc:
"""TCP socket 的收包缓存调优。默认值是针对高吞吐量的一个推荐值。"""
socket_receive_buffer.label:
"""Socket 收包缓存大小"""
consumer_topic_mapping.desc:
"""指定 Kafka 主题和 MQTT 主题之间的映射关系。 应至少包含一项。"""
consumer_topic_mapping.label:
"""主题映射关系"""
producer_kafka_opts.desc:
"""Kafka 生产者参数。"""
producer_kafka_opts.label:
"""生产者参数"""
kafka_topic.desc:
"""Kafka 主题名称"""
kafka_topic.label:
"""Kafka 主题名称"""
consumer_kafka_topic.desc:
"""指定从哪个 Kafka 主题消费消息。"""
consumer_kafka_topic.label:
"""Kafka 主题"""
auth_username_password.desc:
"""基于用户名密码的认证。"""
auth_username_password.label:
"""用户名密码认证"""
auth_sasl_password.desc:
"""SASL 认证的密码。"""
auth_sasl_password.label:
"""密码"""
kafka_message_timestamp.desc:
"""生成 Kafka 消息时间戳的模版。该时间必需是一个整型数值(可以是字符串格式)例如 1661326462115
或 '1661326462115'
。当所需的输入字段不存在,或不是一个整型时,则会使用当前系统时间。"""
kafka_message_timestamp.label:
"""消息的时间戳"""
buffer_mode.desc:
"""消息缓存模式。
memory
: 所有的消息都缓存在内存里。如果 EMQX 服务重启,缓存的消息会丢失。
disk
: 缓存到磁盘上。EMQX 重启后会继续发送重启前未发送完成的消息。
hybrid
: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制(配置项 segment_bytes
描述了该限制)后,后续的消息会缓存到磁盘上。与 memory
模式一样,如果 EMQX 服务重启,缓存的消息会丢失。"""
buffer_mode.label:
"""缓存模式"""
consumer_mqtt_qos.desc:
"""转发 MQTT 消息时使用的 QoS。"""
consumer_mqtt_qos.label:
"""QoS"""
consumer_key_encoding_mode.desc:
"""通过 MQTT 转发之前,如何处理 Kafka 消息的 Key。none
使用 Kafka 消息中的 Key 原始值,不进行编码。 注意:在这种情况下,Key 必须是一个有效的 UTF-8 字符串。
base64
对收到的密钥或值使用 base-64 编码。"""
consumer_key_encoding_mode.label:
"""Key 编码模式"""
auth_gssapi_kerberos.desc:
"""使用 GSSAPI/Kerberos 认证。"""
auth_gssapi_kerberos.label:
"""GSSAPI/Kerberos"""
consumer_mqtt_opts.desc:
"""本地 MQTT 消息转发。"""
consumer_mqtt_opts.label:
"""MQTT 转发"""
auth_kerberos_principal.desc:
"""SASL GSSAPI 认证方法的 Kerberos principal,例如 client_name@MY.KERBEROS.REALM.MYDOMAIN.COM
注意:这里使用的 realm 需要配置在 EMQX 服务器的 /etc/krb5.conf 中"""
auth_kerberos_principal.label:
"""Kerberos Principal"""
socket_opts.desc:
"""更多 Socket 参数设置。"""
socket_opts.label:
"""Socket 参数"""
consumer_mqtt_topic.desc:
"""设置 Kafka 消息向哪个本地 MQTT 主题转发消息。"""
consumer_mqtt_topic.label:
"""MQTT主题"""
consumer_offset_reset_policy.desc:
"""如不存在偏移量历史记录或历史记录失效,消费者应使用哪个偏移量开始消费。"""
consumer_offset_reset_policy.label:
"""偏移重置策略"""
partition_count_refresh_interval.desc:
"""配置 Kafka 刷新分区数量的时间间隔。
EMQX 发现 Kafka 分区数量增加后,会开始按 partition_strategy 配置,把消息发送到新的分区中。"""
partition_count_refresh_interval.label:
"""分区数量刷新间隔"""
max_batch_bytes.desc:
"""最大消息批量字节数。大多数 Kafka 环境的默认最低值是 1 MB,EMQX 的默认值比 1 MB 更小是因为需要补偿 Kafka 消息编码所需要的额外字节(尤其是当每条消息都很小的情况下)。当单个消息的大小超过该限制时,它仍然会被发送,(相当于该批量中只有单个消息)。"""
max_batch_bytes.label:
"""最大批量字节数"""
required_acks.desc:
"""设置 Kafka leader 在返回给 EMQX 确认之前需要等待多少个 follower 的确认。
all_isr
: 需要所有的在线复制者都确认。
leader_only
: 仅需要分区 leader 确认。
none
: 无需 Kafka 回复任何确认。"""
required_acks.label:
"""Kafka 确认数量"""
metadata_request_timeout.desc:
"""刷新元数据时最大等待时长。"""
metadata_request_timeout.label:
"""元数据请求超时"""
desc_type.desc:
"""桥接类型"""
desc_type.label:
"""桥接类型"""
socket_nodelay.desc:
"""设置‘true’让系统内核立即发送。否则当需要发送的内容很少时,可能会有一定延迟(默认 40 毫秒)。"""
socket_nodelay.label:
"""是否关闭延迟发送"""
authentication.desc:
"""认证参数。"""
authentication.label:
"""认证"""
buffer_memory_overload_protection.desc:
"""缓存模式是 memory
或 hybrid
时适用。当系统处于高内存压力时,从队列中丢弃旧的消息以减缓内存增长。内存压力值由配置项 sysmon.os.sysmem_high_watermark
决定。注意,该配置仅在 Linux 系统中有效。"""
buffer_memory_overload_protection.label:
"""内存过载保护"""
auth_sasl_mechanism.desc:
"""SASL 认证方法名称。"""
auth_sasl_mechanism.label:
"""认证方法"""
config_enable.desc:
"""启用(true)或停用该(false)Kafka 数据桥接。"""
config_enable.label:
"""启用或停用"""
consumer_mqtt_payload.desc:
"""用于转换收到的 Kafka 消息的模板。 默认情况下,它将使用 JSON 格式来序列化来自 Kafka 的所有字段。 这些字段包括:headers
:一个包含字符串键值对的 JSON 对象。
key
:Kafka 消息的键(使用选择的编码方式编码)。
offset
:消息的偏移量。
topic
:Kafka 主题。
ts
: 消息的时间戳。
ts_type
:消息的时间戳类型,值可能是: create
, append
或 undefined
。
value
: Kafka 消息值(使用选择的编码方式编码)。"""
consumer_mqtt_payload.label:
"""MQTT Payload Template"""
consumer_opts.desc:
"""本地 MQTT 转发 和 Kafka 消费者配置。"""
consumer_opts.label:
"""MQTT 到 Kafka"""
kafka_consumer.desc:
"""Kafka 消费者配置。"""
kafka_consumer.label:
"""Kafka 消费者"""
desc_config.desc:
"""Kafka 桥接配置"""
desc_config.label:
"""Kafka 桥接配置"""
consumer_value_encoding_mode.desc:
"""通过 MQTT 转发之前,如何处理 Kafka 消息的 Value。none
使用 Kafka 消息中的 Value 原始值,不进行编码。 注意:在这种情况下,Value 必须是一个有效的 UTF-8 字符串。
base64
对收到的 Value 使用 base-64 编码。"""
consumer_value_encoding_mode.label:
"""Value 编码模式"""
buffer_per_partition_limit.desc:
"""为每个 Kafka 分区设置的最大缓存字节数。当超过这个上限之后,老的消息会被丢弃,为新的消息腾出空间。"""
buffer_per_partition_limit.label:
"""Kafka 分区缓存上限"""
bootstrap_hosts.desc:
"""用逗号分隔的 host[:port]
主机列表。默认端口号为 9092。"""
bootstrap_hosts.label:
"""主机列表"""
consumer_max_rejoin_attempts.desc:
"""消费组成员允许重新加入小组的最大次数。如超过该配置次数后仍未能成功加入消费组,则会在等待一段时间后重试。"""
consumer_max_rejoin_attempts.label:
"""最大的重新加入尝试"""
kafka_message_key.desc:
"""生成 Kafka 消息 Key 的模版。如果模版生成后为空值,则会使用 Kafka 的 NULL
,而非空字符串。"""
kafka_message_key.label:
"""消息的 Key"""
kafka_message.desc:
"""用于生成 Kafka 消息的模版。"""
kafka_message.label:
"""Kafka 消息模版"""
mqtt_topic.desc:
"""MQTT 主题数据源由桥接指定,或留空由规则动作指定。"""
mqtt_topic.label:
"""源 MQTT 主题"""
kafka_message_value.desc:
"""生成 Kafka 消息 Value 的模版。如果模版生成后为空值,则会使用 Kafka 的 NULL
,而非空字符串。"""
kafka_message_value.label:
"""消息的 Value"""
partition_strategy.desc:
"""设置消息发布时应该如何选择 Kafka 分区。
random
: 为每个消息随机选择一个分区。
key_dispatch
: Hash Kafka message key to a partition number"""
partition_strategy.label:
"""分区选择策略"""
buffer_segment_bytes.desc:
"""当缓存模式是 disk
或 hybrid
时适用。该配置用于指定缓存到磁盘上的文件的大小。"""
buffer_segment_bytes.label:
"""缓存文件大小"""
consumer_kafka_opts.desc:
"""Kafka消费者配置。"""
consumer_kafka_opts.label:
"""Kafka 消费者"""
max_inflight.desc:
"""设置 Kafka 生产者(每个分区一个)在收到 Kafka 的确认前最多发送多少个请求(批量)。调大这个值通常可以增加吞吐量,但是,当该值设置大于 1 时存在消息乱序的风险。"""
max_inflight.label:
"""飞行窗口"""
auth_sasl_username.desc:
"""SASL 认证的用户名。"""
auth_sasl_username.label:
"""用户名"""
auth_kerberos_keytab_file.desc:
"""SASL GSSAPI 认证方法的 Kerberos keytab 文件。注意:该文件需要上传到 EMQX 服务器中,且运行 EMQX 服务的系统账户需要有读取权限。"""
auth_kerberos_keytab_file.label:
"""Kerberos keytab 文件"""
compression.desc:
"""压缩方法。"""
compression.label:
"""压缩"""
}