feat: Add Kafka connector
This commit is contained in:
parent
477d4b0b03
commit
0c1595be02
|
@ -0,0 +1,27 @@
|
|||
version: '3.9'
|
||||
|
||||
services:
|
||||
zookeeper:
|
||||
image: wurstmeister/zookeeper
|
||||
ports:
|
||||
- "2181:2181"
|
||||
container_name: zookeeper
|
||||
hostname: zookeeper
|
||||
networks:
|
||||
emqx_bridge:
|
||||
kafka_1:
|
||||
image: wurstmeister/kafka:2.13-2.7.0
|
||||
ports:
|
||||
- "9092:9092"
|
||||
container_name: kafka-1.emqx.net
|
||||
hostname: kafka-1.emqx.net
|
||||
environment:
|
||||
KAFKA_BROKER_ID: 1
|
||||
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
|
||||
KAFKA_LISTENERS: PLAINTEXT://:9092
|
||||
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092
|
||||
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT
|
||||
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
|
||||
KAFKA_CREATE_TOPICS: test-topic-one-partition:1:1,test-topic-two-partitions:2:1,test-topic-three-partitions:3:1,
|
||||
networks:
|
||||
emqx_bridge:
|
|
@ -32,6 +32,7 @@
|
|||
stop_apps/1,
|
||||
reload/2,
|
||||
app_path/2,
|
||||
proj_root/0,
|
||||
deps_path/2,
|
||||
flush/0,
|
||||
flush/1
|
||||
|
@ -245,6 +246,14 @@ stop_apps(Apps) ->
|
|||
[application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]],
|
||||
ok.
|
||||
|
||||
proj_root() ->
|
||||
filename:join(
|
||||
lists:takewhile(
|
||||
fun(X) -> iolist_to_binary(X) =/= <<"_build">> end,
|
||||
filename:split(app_path(emqx, "."))
|
||||
)
|
||||
).
|
||||
|
||||
%% backward compatible
|
||||
deps_path(App, RelativePath) -> app_path(App, RelativePath).
|
||||
|
||||
|
|
|
@ -334,6 +334,8 @@ get_matched_bridges(Topic) ->
|
|||
Bridges
|
||||
).
|
||||
|
||||
%% TODO: refactor to return bridge type, and bridge name directly
|
||||
%% so there is no need to parse the id back to type and name at where it is used
|
||||
get_matched_bridge_id(_BType, #{enable := false}, _Topic, _BName, Acc) ->
|
||||
Acc;
|
||||
get_matched_bridge_id(BType, #{local_topic := Filter}, Topic, BName, Acc) when
|
||||
|
|
|
@ -44,7 +44,7 @@
|
|||
]).
|
||||
|
||||
%% bi-directional bridge with producer/consumer or ingress/egress configs
|
||||
-define(IS_BI_DIR_BRIDGE(TYPE), TYPE == <<"mqtt">>; TYPE == <<"kafka">>).
|
||||
-define(IS_BI_DIR_BRIDGE(TYPE), TYPE =:= <<"mqtt">>; TYPE =:= <<"kafka">>).
|
||||
|
||||
-if(?EMQX_RELEASE_EDITION == ee).
|
||||
bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt;
|
||||
|
@ -71,6 +71,8 @@ bridge_id(BridgeType, BridgeName) ->
|
|||
Type = bin(BridgeType),
|
||||
<<Type/binary, ":", Name/binary>>.
|
||||
|
||||
parse_bridge_id(<<"bridge:", BridgeId/binary>>) ->
|
||||
parse_bridge_id(BridgeId);
|
||||
parse_bridge_id(BridgeId) ->
|
||||
case string:split(bin(BridgeId), ":", all) of
|
||||
[Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)};
|
||||
|
@ -261,7 +263,7 @@ parse_confs(Type, Name, Conf) when ?IS_BI_DIR_BRIDGE(Type) ->
|
|||
%% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it
|
||||
%% receives a message from the external database.
|
||||
BName = bridge_id(Type, Name),
|
||||
Conf#{hookpoint => <<"$bridges/", BName/binary>>};
|
||||
Conf#{hookpoint => <<"$bridges/", BName/binary>>, bridge_name => Name};
|
||||
parse_confs(_Type, _Name, Conf) ->
|
||||
Conf.
|
||||
|
||||
|
|
|
@ -93,7 +93,8 @@
|
|||
%% verify if the resource is working normally
|
||||
call_health_check/3,
|
||||
%% stop the instance
|
||||
call_stop/3
|
||||
call_stop/3,
|
||||
is_buffer_supported/1
|
||||
]).
|
||||
|
||||
%% list all the instances, id only.
|
||||
|
@ -117,7 +118,8 @@
|
|||
on_batch_query/3,
|
||||
on_query_async/4,
|
||||
on_batch_query_async/4,
|
||||
on_get_status/2
|
||||
on_get_status/2,
|
||||
is_buffer_supported/0
|
||||
]).
|
||||
|
||||
%% when calling emqx_resource:start/1
|
||||
|
@ -155,6 +157,8 @@
|
|||
| {resource_status(), resource_state()}
|
||||
| {resource_status(), resource_state(), term()}.
|
||||
|
||||
-callback is_buffer_supported() -> boolean().
|
||||
|
||||
-spec list_types() -> [module()].
|
||||
list_types() ->
|
||||
discover_resource_mods().
|
||||
|
@ -256,10 +260,15 @@ query(ResId, Request) ->
|
|||
Result :: term().
|
||||
query(ResId, Request, Opts) ->
|
||||
case emqx_resource_manager:ets_lookup(ResId) of
|
||||
{ok, _Group, #{query_mode := QM}} ->
|
||||
case QM of
|
||||
sync -> emqx_resource_worker:sync_query(ResId, Request, Opts);
|
||||
async -> emqx_resource_worker:async_query(ResId, Request, Opts)
|
||||
{ok, _Group, #{query_mode := QM, mod := Module}} ->
|
||||
IsBufferSupported = is_buffer_supported(Module),
|
||||
case {IsBufferSupported, QM} of
|
||||
{true, _} ->
|
||||
emqx_resource_worker:simple_sync_query(ResId, Request);
|
||||
{false, sync} ->
|
||||
emqx_resource_worker:sync_query(ResId, Request, Opts);
|
||||
{false, async} ->
|
||||
emqx_resource_worker:async_query(ResId, Request, Opts)
|
||||
end;
|
||||
{error, not_found} ->
|
||||
?RESOURCE_ERROR(not_found, "resource not found")
|
||||
|
@ -336,6 +345,15 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group).
|
|||
get_callback_mode(Mod) ->
|
||||
Mod:callback_mode().
|
||||
|
||||
-spec is_buffer_supported(module()) -> boolean().
|
||||
is_buffer_supported(Module) ->
|
||||
try
|
||||
Module:is_buffer_supported()
|
||||
catch
|
||||
_:_ ->
|
||||
false
|
||||
end.
|
||||
|
||||
-spec call_start(manager_id(), module(), resource_config()) ->
|
||||
{ok, resource_state()} | {error, Reason :: term()}.
|
||||
call_start(MgrId, Mod, Config) ->
|
||||
|
|
|
@ -146,14 +146,20 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) ->
|
|||
],
|
||||
[matched]
|
||||
),
|
||||
case emqx_resource:is_buffer_supported(ResourceType) of
|
||||
true ->
|
||||
%% the resource it self supports
|
||||
%% buffer, so there is no need for resource workers
|
||||
ok;
|
||||
false ->
|
||||
ok = emqx_resource_worker_sup:start_workers(ResId, Opts),
|
||||
case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of
|
||||
true ->
|
||||
wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT));
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
ok.
|
||||
end
|
||||
end.
|
||||
|
||||
%% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance.
|
||||
%%
|
||||
|
|
|
@ -1,2 +1,3 @@
|
|||
mongo
|
||||
mongo_rs_sharded
|
||||
kafka
|
||||
|
|
|
@ -0,0 +1,471 @@
|
|||
emqx_ee_bridge_kafka {
|
||||
config_enable {
|
||||
desc {
|
||||
en: "Enable (true) or disable (false) this Kafka bridge."
|
||||
zh: "启用(true)或停用该(false)Kafka 数据桥接。"
|
||||
}
|
||||
label {
|
||||
en: "Enable or Disable"
|
||||
zh: "启用或停用"
|
||||
}
|
||||
}
|
||||
desc_config {
|
||||
desc {
|
||||
en: """Configuration for a Kafka bridge."""
|
||||
zh: """Kafka 桥接配置"""
|
||||
}
|
||||
label {
|
||||
en: "Kafka Bridge Configuration"
|
||||
zh: "Kafka 桥接配置"
|
||||
}
|
||||
}
|
||||
desc_type {
|
||||
desc {
|
||||
en: """The Bridge Type"""
|
||||
zh: """桥接类型"""
|
||||
}
|
||||
label {
|
||||
en: "Bridge Type"
|
||||
zh: "桥接类型"
|
||||
}
|
||||
}
|
||||
desc_name {
|
||||
desc {
|
||||
en: """Bridge name, used as a human-readable description of the bridge."""
|
||||
zh: """桥接名字,可读描述"""
|
||||
}
|
||||
label {
|
||||
en: "Bridge Name"
|
||||
zh: "桥接名字"
|
||||
}
|
||||
}
|
||||
producer_opts {
|
||||
desc {
|
||||
en: "Local MQTT data source and Kafka bridge configs."
|
||||
zh: "本地 MQTT 数据源和 Kafka 桥接的配置。"
|
||||
}
|
||||
label {
|
||||
en: "MQTT to Kafka"
|
||||
zh: "MQTT 到 Kafka"
|
||||
}
|
||||
}
|
||||
producer_mqtt_opts {
|
||||
desc {
|
||||
en: "MQTT data source. Optional when used as a rule-engine action."
|
||||
zh: "需要桥接到 MQTT 源主题。"
|
||||
}
|
||||
label {
|
||||
en: "MQTT Source Topic"
|
||||
zh: "MQTT 源主题"
|
||||
}
|
||||
}
|
||||
mqtt_topic {
|
||||
desc {
|
||||
en: "MQTT topic or topic as data source (bridge input)."
|
||||
zh: "指定 MQTT 主题作为桥接的数据源"
|
||||
}
|
||||
label {
|
||||
en: "Source MQTT Topic"
|
||||
zh: "源 MQTT 主题"
|
||||
}
|
||||
}
|
||||
producer_kafka_opts {
|
||||
desc {
|
||||
en: "Kafka producer configs."
|
||||
zh: "Kafka 生产者参数。"
|
||||
}
|
||||
label {
|
||||
en: "Kafka Producer"
|
||||
zh: "生产者参数"
|
||||
}
|
||||
}
|
||||
bootstrap_hosts {
|
||||
desc {
|
||||
en: "A comma separated list of Kafka <code>host:port</code> endpoints to bootstrap the client."
|
||||
zh: "用逗号分隔的 <code>host:port</code> 主机列表。"
|
||||
}
|
||||
label {
|
||||
en: "Bootstrap Hosts"
|
||||
zh: "主机列表"
|
||||
}
|
||||
}
|
||||
connect_timeout {
|
||||
desc {
|
||||
en: "Maximum wait time for TCP connection establishment (including authentication time if enabled)."
|
||||
zh: "建立 TCP 连接时的最大等待时长(若启用认证,这个等待时长也包含完成认证所需时间)。"
|
||||
}
|
||||
label {
|
||||
en: "Connect Timeout"
|
||||
zh: "连接超时"
|
||||
}
|
||||
}
|
||||
min_metadata_refresh_interval {
|
||||
desc {
|
||||
en: "Minimum time interval the client has to wait before refreshing Kafka broker and topic metadata. "
|
||||
"Setting too small value may add extra load on Kafka."
|
||||
zh: "刷新 Kafka broker 和 Kafka 主题元数据段最短时间间隔。设置太小可能会增加 Kafka 压力。"
|
||||
}
|
||||
label {
|
||||
en: "Min Metadata Refresh Interval"
|
||||
zh: "元数据刷新最小间隔"
|
||||
}
|
||||
}
|
||||
metadata_request_timeout {
|
||||
desc {
|
||||
en: "Maximum wait time when fetching metadata from Kafka."
|
||||
zh: "刷新元数据时最大等待时长。"
|
||||
}
|
||||
label {
|
||||
en: "Metadata Request Timeout"
|
||||
zh: "元数据请求超时"
|
||||
}
|
||||
}
|
||||
authentication {
|
||||
desc {
|
||||
en: "Authentication configs."
|
||||
zh: "认证参数。"
|
||||
}
|
||||
label {
|
||||
en: "Authentication"
|
||||
zh: "认证"
|
||||
}
|
||||
}
|
||||
socket_opts {
|
||||
desc {
|
||||
en: "Extra socket options."
|
||||
zh: "更多 Socket 参数设置。"
|
||||
}
|
||||
label {
|
||||
en: "Socket Options"
|
||||
zh: "Socket 参数"
|
||||
}
|
||||
}
|
||||
auth_sasl_mechanism {
|
||||
desc {
|
||||
en: "SASL authentication mechanism."
|
||||
zh: "SASL 认证方法名称。"
|
||||
}
|
||||
label {
|
||||
en: "Mechanism"
|
||||
zh: "认证方法"
|
||||
}
|
||||
}
|
||||
auth_sasl_username {
|
||||
desc {
|
||||
en: "SASL authentication username."
|
||||
zh: "SASL 认证的用户名。"
|
||||
}
|
||||
label {
|
||||
en: "Username"
|
||||
zh: "用户名"
|
||||
}
|
||||
}
|
||||
auth_sasl_password {
|
||||
desc {
|
||||
en: "SASL authentication password."
|
||||
zh: "SASL 认证的密码。"
|
||||
}
|
||||
label {
|
||||
en: "Password"
|
||||
zh: "密码"
|
||||
}
|
||||
}
|
||||
auth_kerberos_principal {
|
||||
desc {
|
||||
en: "SASL GSSAPI authentication Kerberos principal. "
|
||||
"For example <code>client_name@MY.KERBEROS.REALM.MYDOMAIN.COM</code>, "
|
||||
"NOTE: The realm in use has to be configured in /etc/krb5.conf in EMQX nodes."
|
||||
zh: "SASL GSSAPI 认证方法的 Kerberos principal,"
|
||||
"例如 <code>client_name@MY.KERBEROS.REALM.MYDOMAIN.COM</code>"
|
||||
"注意:这里使用的 realm 需要配置在 EMQX 服务器的 /etc/krb5.conf 中"
|
||||
}
|
||||
label {
|
||||
en: "Kerberos Principal"
|
||||
zh: "Kerberos Principal"
|
||||
}
|
||||
}
|
||||
auth_kerberos_keytab_file {
|
||||
desc {
|
||||
en: "SASL GSSAPI authentication Kerberos keytab file path. "
|
||||
"NOTE: This file has to be placed in EMQX nodes, and the EMQX service runner user requires read permission."
|
||||
zh: "SASL GSSAPI 认证方法的 Kerberos keytab 文件。"
|
||||
"注意:该文件需要上传到 EMQX 服务器中,且运行 EMQX 服务的系统账户需要有读取权限。"
|
||||
}
|
||||
label {
|
||||
en: "Kerberos keytab file"
|
||||
zh: "Kerberos keytab 文件"
|
||||
}
|
||||
}
|
||||
socket_send_buffer {
|
||||
desc {
|
||||
en: "Fine tune the socket send buffer. The default value is tuned for high throughput."
|
||||
zh: "TCP socket 的发送缓存调优。默认值是针对高吞吐量的一个推荐值。"
|
||||
}
|
||||
label {
|
||||
en: "Socket Send Buffer Size"
|
||||
zh: "Socket 发送缓存大小"
|
||||
}
|
||||
}
|
||||
socket_receive_buffer {
|
||||
desc {
|
||||
en: "Fine tune the socket receive buffer. The default value is tuned for high throughput."
|
||||
zh: "TCP socket 的收包缓存调优。默认值是针对高吞吐量的一个推荐值。"
|
||||
}
|
||||
label {
|
||||
en: "Socket Receive Buffer Size"
|
||||
zh: "Socket 收包缓存大小"
|
||||
}
|
||||
}
|
||||
socket_nodelay {
|
||||
desc {
|
||||
en: "When set to 'true', TCP buffer sent as soon as possible. "
|
||||
"Otherwise the OS kernel may buffer small TCP packets for a while (40ms by default)."
|
||||
zh: "设置 ‘true' 让系统内核立即发送。否则当需要发送当内容很少时,可能会有一定延迟(默认 40 毫秒)。"
|
||||
}
|
||||
label {
|
||||
en: "No Delay"
|
||||
zh: "是否延迟发送"
|
||||
}
|
||||
}
|
||||
kafka_topic {
|
||||
desc {
|
||||
en: "Kafka topic name"
|
||||
zh: "Kafka 主题名称"
|
||||
}
|
||||
label {
|
||||
en: "Kafka Topic Name"
|
||||
zh: "Kafka 主题名称"
|
||||
}
|
||||
}
|
||||
kafka_message {
|
||||
desc {
|
||||
en: "Template to render a Kafka message."
|
||||
zh: "用于生成 Kafka 消息的模版。"
|
||||
}
|
||||
label {
|
||||
en: "Kafka Message Template"
|
||||
zh: "Kafka 消息模版"
|
||||
}
|
||||
}
|
||||
kafka_message_key {
|
||||
desc {
|
||||
en: "Template to render Kafka message key. "
|
||||
"If the desired variable for this template is not found in the input data "
|
||||
"<code>NULL</code> is used."
|
||||
zh: "生成 Kafka 消息 Key 的模版。当所需要的输入没有时,会使用 <code>NULL</code>。"
|
||||
}
|
||||
label {
|
||||
en: "Message Key"
|
||||
zh: "消息的 Key"
|
||||
}
|
||||
}
|
||||
kafka_message_value {
|
||||
desc {
|
||||
en: "Template to render Kafka message value. "
|
||||
"If the desired variable for this template is not found in the input data "
|
||||
"<code>NULL</code> is used."
|
||||
zh: "生成 Kafka 消息 Value 的模版。当所需要的输入没有时,会使用 <code>NULL</code>。"
|
||||
}
|
||||
label {
|
||||
en: "Message Value"
|
||||
zh: "消息的 Value"
|
||||
}
|
||||
}
|
||||
kafka_message_timestamp {
|
||||
desc {
|
||||
en: "Which timestamp to use. "
|
||||
"The timestamp is expected to be a millisecond precision Unix epoch "
|
||||
"which can be in string format, e.g. <code>1661326462115</code> or "
|
||||
"<code>'1661326462115'</code>. "
|
||||
"When the desired data field for this template is not found, "
|
||||
"or if the found data is not a valid integer, "
|
||||
"the current system timestamp will be used."
|
||||
zh: "生成 Kafka 消息时间戳的模版。"
|
||||
"该时间必需是一个整型数值(可以是字符串格式)例如 <code>1661326462115</code> "
|
||||
"或 <code>'1661326462115'</code>。"
|
||||
"当所需的输入字段不存在,或不是一个整型时,"
|
||||
"则会使用当前系统时间。"
|
||||
}
|
||||
label {
|
||||
en: "Message Timestamp"
|
||||
zh: "消息的时间戳"
|
||||
}
|
||||
}
|
||||
max_batch_bytes {
|
||||
desc {
|
||||
en: "Maximum bytes to collect in a Kafka message batch. "
|
||||
"Most of the Kafka brokers default to a limit of 1MB batch size. "
|
||||
"EMQX's default value is less than 1MB in order to compensate "
|
||||
"Kafka message encoding overheads (especially when each individual message is very small). "
|
||||
"When a single message is over the limit, it is still sent (as a single element batch)."
|
||||
zh: "最大消息批量字节数。"
|
||||
"大多数 Kafka 环境的默认最低值是 1MB,EMQX 的默认值比 1MB 更小是因为需要"
|
||||
"补偿 Kafka 消息编码索需要的额外字节(尤其是当每条消息都很小的情况下)。"
|
||||
"当单个消息的大小超过该限制时,它仍然会被发送,(相当于该批量中只有单个消息)。"
|
||||
}
|
||||
label {
|
||||
en: "Max Batch Bytes"
|
||||
zh: "最大批量字节数"
|
||||
}
|
||||
}
|
||||
compression {
|
||||
desc {
|
||||
en: "Compression method."
|
||||
zh: "压缩方法。"
|
||||
}
|
||||
label {
|
||||
en: "Compression"
|
||||
zh: "压缩"
|
||||
}
|
||||
}
|
||||
partition_strategy {
|
||||
desc {
|
||||
en: "Partition strategy is to tell the producer how to dispatch messages to Kafka partitions.\n\n"
|
||||
"<code>random</code>: Randomly pick a partition for each message\n"
|
||||
"<code>key_dispatch</code>: Hash Kafka message key to a partition number\n"
|
||||
zh: "设置消息发布时应该如何选择 Kafka 分区。\n\n"
|
||||
"<code>random</code>: 为每个消息随机选择一个分区。\n"
|
||||
"<code>key_dispatch</code>: Hash Kafka message key to a partition number\n"
|
||||
}
|
||||
label {
|
||||
en: "Partition Strategy"
|
||||
zh: "分区选择策略"
|
||||
}
|
||||
}
|
||||
required_acks {
|
||||
desc {
|
||||
en: "Required acknowledgements for Kafka partition leader to wait for its followers "
|
||||
"before it sends back the acknowledgement to EMQX Kafka producer\n\n"
|
||||
"<code>all_isr</code>: Require all in-sync replicas to acknowledge.\n"
|
||||
"<code>leader_only</code>: Require only the partition-leader's acknowledgement.\n"
|
||||
"<code>none</code>: No need for Kafka to acknowledge at all.\n"
|
||||
zh: "设置 Kafka leader 在返回给 EMQX 确认之前需要等待多少个 follower 的确认。\n\n"
|
||||
"<code>all_isr</code>: 需要所有的在线复制者都确认。\n"
|
||||
"<code>leader_only</code>: 仅需要分区 leader 确认。\n"
|
||||
"<code>none</code>: 无需 Kafka 回复任何确认。\n"
|
||||
}
|
||||
label {
|
||||
en: "Required Acks"
|
||||
zh: "Kafka 确认数量"
|
||||
}
|
||||
}
|
||||
partition_count_refresh_interval {
|
||||
desc {
|
||||
en: "The time interval for Kafka producer to discover increased number of partitions.\n"
|
||||
"After the number of partitions is increased in Kafka, EMQX will start taking the \n"
|
||||
"discovered partitions into account when dispatching messages per <code>partition_strategy</code>."
|
||||
zh: "配置 Kafka 刷新分区数量的时间间隔。\n"
|
||||
"EMQX 发现 Kafka 分区数量增加后,会开始按 <code>partition_strategy<code> 配置,把消息发送到新的分区中。"
|
||||
}
|
||||
label {
|
||||
en: "Partition Count Refresh Interval"
|
||||
zh: "分区数量刷新间隔"
|
||||
}
|
||||
}
|
||||
max_inflight {
|
||||
desc {
|
||||
en: "Maximum number of batches allowed for Kafka producer (per-partition) to send before receiving acknowledgement from Kafka. "
|
||||
"Greater value typically means better throughput. However, there can be a risk of message reordering when this "
|
||||
"value is greater than 1."
|
||||
zh: "设置 Kafka 生产者(每个分区一个)在收到 Kafka 的确认前最多发送多少个请求(批量)。"
|
||||
"调大这个值通常可以增加吞吐量,但是,当该值设置大于 1 是存在消息乱序的风险。"
|
||||
}
|
||||
label {
|
||||
en: "Max Inflight"
|
||||
zh: "飞行窗口"
|
||||
}
|
||||
}
|
||||
producer_buffer {
|
||||
desc {
|
||||
en: "Configure producer message buffer.\n\n"
|
||||
"Tell Kafka producer how to buffer messages when EMQX has more messages to send than "
|
||||
"Kafka can keep up, or when Kafka is down.\n\n"
|
||||
zh: "配置消息缓存的相关参数。\n\n"
|
||||
"当 EMQX 需要发送的消息超过 Kafka 处理能力,或者当 Kafka 临时下线时,EMQX 内部会将消息缓存起来。"
|
||||
}
|
||||
label {
|
||||
en: "Message Buffer"
|
||||
zh: "消息缓存"
|
||||
}
|
||||
}
|
||||
buffer_mode {
|
||||
desc {
|
||||
en: "Message buffer mode.\n\n"
|
||||
"<code>memory</code>: Buffer all messages in memory. The messages will be lost in case of EMQX node restart\n"
|
||||
"<code>disc</code>: Buffer all messages on disk. The messages on disk are able to survive EMQX node restart.\n"
|
||||
"<code>hybrid</code>: Buffer message in memory first, when up to certain limit "
|
||||
"(see <code>segment_bytes</code> config for more information), then start offloading "
|
||||
"messages to disk, Like <code>memory</code> mode, the messages will be lost in case of "
|
||||
"EMQX node restart."
|
||||
zh: "消息缓存模式。\n"
|
||||
"<code>memory</code>: 所有的消息都缓存在内存里。如果 EMQX 服务重启,缓存的消息会丢失。\n"
|
||||
"<code>disc</code>: 缓存到磁盘上。EMQX 重启后会继续发送重启前未发送完成的消息。\n"
|
||||
"<code>hybrid</code>: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制"
|
||||
"(配置项 <code>segment_bytes</code> 描述了该限制)后,后续的消息会缓存到磁盘上。"
|
||||
"与 <code>memory</code> 模式一样,如果 EMQX 服务重启,缓存的消息会丢失。"
|
||||
}
|
||||
label {
|
||||
en: "Buffer Mode"
|
||||
zh: "缓存模式"
|
||||
}
|
||||
}
|
||||
buffer_per_partition_limit {
|
||||
desc {
|
||||
en: "Number of bytes allowed to buffer for each Kafka partition. "
|
||||
"When this limit is exceeded, old messages will be dropped in a trade for credits "
|
||||
"for new messages to be buffered."
|
||||
zh: "为每个 Kafka 分区设置的最大缓存字节数。当超过这个上限之后,老的消息会被丢弃,"
|
||||
"为新的消息腾出空间。"
|
||||
}
|
||||
label {
|
||||
en: "Per-partition Buffer Limit"
|
||||
zh: "Kafka 分区缓存上限"
|
||||
}
|
||||
}
|
||||
buffer_segment_bytes {
|
||||
desc {
|
||||
en: "Applicable when buffer mode is set to <code>disk</code> or <code>hybrid</code>.\n"
|
||||
"This value is to specify the size of each on-disk buffer file."
|
||||
zh: "当缓存模式是 <code>disk</code> 或 <code>hybrid</code> 时适用。"
|
||||
"该配置用于指定缓存到磁盘上的文件的大小。"
|
||||
}
|
||||
label {
|
||||
en: "Segment File Bytes"
|
||||
zh: "缓存文件大小"
|
||||
}
|
||||
}
|
||||
buffer_memory_overload_protection {
|
||||
desc {
|
||||
en: "Applicable when buffer mode is set to <code>memory</code> or <code>hybrid</code>.\n"
|
||||
"EMQX will drop old cached messages under high memory pressure. "
|
||||
"The high memory threshold is defined in config <code>sysmon.os.sysmem_high_watermark</code>."
|
||||
zh: "缓存模式是 <code>memory</code> 或 <code>hybrid</code> 时适用。"
|
||||
"当系统处于高内存压力时,从队列中丢弃旧的消息以减缓内存增长。"
|
||||
"内存压力值由配置项 <code>sysmon.os.sysmem_high_watermark</code> 决定。"
|
||||
}
|
||||
label {
|
||||
en: "Memory Overload Protection"
|
||||
zh: "内存过载保护"
|
||||
}
|
||||
}
|
||||
auth_username_password {
|
||||
desc {
|
||||
en: "Username/password based authentication."
|
||||
zh: "基于用户名密码的认证。"
|
||||
}
|
||||
label {
|
||||
en: "Username/password Auth"
|
||||
zh: "用户名密码认证"
|
||||
}
|
||||
}
|
||||
auth_gssapi_kerberos {
|
||||
desc {
|
||||
en: "Use GSSAPI/Kerberos authentication."
|
||||
zh: "使用 GSSAPI/Kerberos 认证。"
|
||||
}
|
||||
label {
|
||||
en: "GSSAPI/Kerberos"
|
||||
zh: "GSSAPI/Kerberos"
|
||||
}
|
||||
}
|
||||
}
|
|
@ -1,5 +1,9 @@
|
|||
{erl_opts, [debug_info]}.
|
||||
{deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}}
|
||||
, {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.6.4"}}}
|
||||
, {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}}
|
||||
, {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}}
|
||||
, {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.4"}}}
|
||||
, {emqx_connector, {path, "../../apps/emqx_connector"}}
|
||||
, {emqx_resource, {path, "../../apps/emqx_resource"}}
|
||||
, {emqx_bridge, {path, "../../apps/emqx_bridge"}}
|
||||
|
|
|
@ -3,7 +3,8 @@
|
|||
{registered, []},
|
||||
{applications, [
|
||||
kernel,
|
||||
stdlib
|
||||
stdlib,
|
||||
emqx_ee_connector
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
api_schemas(Method) ->
|
||||
[
|
||||
ref(emqx_ee_bridge_kafka, Method),
|
||||
ref(emqx_ee_bridge_mysql, Method),
|
||||
ref(emqx_ee_bridge_mongodb, Method ++ "_rs"),
|
||||
ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"),
|
||||
|
@ -26,6 +27,7 @@ api_schemas(Method) ->
|
|||
|
||||
schema_modules() ->
|
||||
[
|
||||
emqx_ee_bridge_kafka,
|
||||
emqx_ee_bridge_hstreamdb,
|
||||
emqx_ee_bridge_influxdb,
|
||||
emqx_ee_bridge_mongodb,
|
||||
|
@ -45,6 +47,7 @@ examples(Method) ->
|
|||
lists:foldl(Fun, #{}, schema_modules()).
|
||||
|
||||
resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8));
|
||||
resource_type(kafka) -> emqx_bridge_impl_kafka;
|
||||
resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb;
|
||||
resource_type(mongodb_rs) -> emqx_connector_mongo;
|
||||
resource_type(mongodb_sharded) -> emqx_connector_mongo;
|
||||
|
@ -56,6 +59,11 @@ resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb.
|
|||
|
||||
fields(bridges) ->
|
||||
[
|
||||
{kafka,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_ee_bridge_kafka, "config")),
|
||||
#{desc => <<"EMQX Enterprise Config">>}
|
||||
)},
|
||||
{hstreamdb,
|
||||
mk(
|
||||
hoconsc:map(name, ref(emqx_ee_bridge_hstreamdb, "config")),
|
||||
|
@ -66,8 +74,9 @@ fields(bridges) ->
|
|||
hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")),
|
||||
#{desc => <<"EMQX Enterprise Config">>}
|
||||
)}
|
||||
] ++ fields(mongodb) ++ fields(influxdb);
|
||||
fields(mongodb) ->
|
||||
] ++ mongodb_structs() ++ influxdb_structs().
|
||||
|
||||
mongodb_structs() ->
|
||||
[
|
||||
{Type,
|
||||
mk(
|
||||
|
@ -75,8 +84,9 @@ fields(mongodb) ->
|
|||
#{desc => <<"EMQX Enterprise Config">>}
|
||||
)}
|
||||
|| Type <- [mongodb_rs, mongodb_sharded, mongodb_single]
|
||||
];
|
||||
fields(influxdb) ->
|
||||
].
|
||||
|
||||
influxdb_structs() ->
|
||||
[
|
||||
{Protocol,
|
||||
mk(
|
||||
|
|
|
@ -0,0 +1,260 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_ee_bridge_kafka).
|
||||
|
||||
-include_lib("emqx_bridge/include/emqx_bridge.hrl").
|
||||
-include_lib("emqx_connector/include/emqx_connector.hrl").
|
||||
-include_lib("typerefl/include/types.hrl").
|
||||
-include_lib("hocon/include/hoconsc.hrl").
|
||||
|
||||
%% allow atoms like scram_sha_256 and scram_sha_512
|
||||
%% i.e. the _256 part does not start with a-z
|
||||
-elvis([
|
||||
{elvis_style, atom_naming_convention, #{
|
||||
regex => "^([a-z][a-z0-9]*_?)([a-z0-9]*_?)*$",
|
||||
enclosed_atoms => ".*"
|
||||
}}
|
||||
]).
|
||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||
|
||||
-export([
|
||||
conn_bridge_examples/1
|
||||
]).
|
||||
|
||||
-export([
|
||||
namespace/0,
|
||||
roots/0,
|
||||
fields/1,
|
||||
desc/1
|
||||
]).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% api
|
||||
|
||||
conn_bridge_examples(Method) ->
|
||||
[
|
||||
#{
|
||||
<<"kafka">> => #{
|
||||
summary => <<"Kafka Bridge">>,
|
||||
value => values(Method)
|
||||
}
|
||||
}
|
||||
].
|
||||
|
||||
values(get) ->
|
||||
maps:merge(values(post), ?METRICS_EXAMPLE);
|
||||
values(post) ->
|
||||
#{
|
||||
bootstrap_hosts => <<"localhost:9092">>
|
||||
};
|
||||
values(put) ->
|
||||
values(post).
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% Hocon Schema Definitions
|
||||
|
||||
namespace() -> "bridge_kafka".
|
||||
|
||||
roots() -> ["config"].
|
||||
|
||||
fields("post") ->
|
||||
[type_field(), name_field() | fields("config")];
|
||||
fields("put") ->
|
||||
fields("config");
|
||||
fields("get") ->
|
||||
emqx_bridge_schema:metrics_status_fields() ++ fields("post");
|
||||
fields("config") ->
|
||||
[
|
||||
{enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})},
|
||||
{bootstrap_hosts, mk(binary(), #{required => true, desc => ?DESC(bootstrap_hosts)})},
|
||||
{connect_timeout,
|
||||
mk(emqx_schema:duration_ms(), #{
|
||||
default => "5s",
|
||||
desc => ?DESC(connect_timeout)
|
||||
})},
|
||||
{min_metadata_refresh_interval,
|
||||
mk(
|
||||
emqx_schema:duration_ms(),
|
||||
#{
|
||||
default => "3s",
|
||||
desc => ?DESC(min_metadata_refresh_interval)
|
||||
}
|
||||
)},
|
||||
{metadata_request_timeout,
|
||||
mk(emqx_schema:duration_ms(), #{
|
||||
default => "5s",
|
||||
desc => ?DESC(metadata_request_timeout)
|
||||
})},
|
||||
{authentication,
|
||||
mk(hoconsc:union([none, ref(auth_username_password), ref(auth_gssapi_kerberos)]), #{
|
||||
default => none, desc => ?DESC("authentication")
|
||||
})},
|
||||
{producer, mk(hoconsc:union([none, ref(producer_opts)]), #{desc => ?DESC(producer_opts)})},
|
||||
%{consumer, mk(hoconsc:union([none, ref(consumer_opts)]), #{desc => ?DESC(consumer_opts)})},
|
||||
{socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})}
|
||||
] ++ emqx_connector_schema_lib:ssl_fields();
|
||||
fields(auth_username_password) ->
|
||||
[
|
||||
{mechanism,
|
||||
mk(enum([plain, scram_sha_256, scram_sha_512]), #{
|
||||
required => true, desc => ?DESC(auth_sasl_mechanism)
|
||||
})},
|
||||
{username, mk(binary(), #{required => true, desc => ?DESC(auth_sasl_username)})},
|
||||
{password,
|
||||
mk(binary(), #{required => true, sensitive => true, desc => ?DESC(auth_sasl_password)})}
|
||||
];
|
||||
fields(auth_gssapi_kerberos) ->
|
||||
[
|
||||
{kerberos_principal,
|
||||
mk(binary(), #{
|
||||
required => true,
|
||||
desc => ?DESC(auth_kerberos_principal)
|
||||
})},
|
||||
{kerberos_keytab_file,
|
||||
mk(binary(), #{
|
||||
required => true,
|
||||
desc => ?DESC(auth_kerberos_keytab_file)
|
||||
})}
|
||||
];
|
||||
fields(socket_opts) ->
|
||||
[
|
||||
{sndbuf,
|
||||
mk(
|
||||
emqx_schema:bytesize(),
|
||||
#{default => "1024KB", desc => ?DESC(socket_send_buffer)}
|
||||
)},
|
||||
{recbuf,
|
||||
mk(
|
||||
emqx_schema:bytesize(),
|
||||
#{default => "1024KB", desc => ?DESC(socket_receive_buffer)}
|
||||
)},
|
||||
{nodelay,
|
||||
mk(
|
||||
boolean(),
|
||||
#{default => true, desc => ?DESC(socket_nodelay)}
|
||||
)}
|
||||
];
|
||||
fields(producer_opts) ->
|
||||
[
|
||||
{mqtt, mk(ref(producer_mqtt_opts), #{desc => ?DESC(producer_mqtt_opts)})},
|
||||
{kafka,
|
||||
mk(ref(producer_kafka_opts), #{
|
||||
required => true,
|
||||
desc => ?DESC(producer_kafka_opts)
|
||||
})}
|
||||
];
|
||||
fields(producer_mqtt_opts) ->
|
||||
[{topic, mk(string(), #{desc => ?DESC(mqtt_topic)})}];
|
||||
fields(producer_kafka_opts) ->
|
||||
[
|
||||
{topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})},
|
||||
{message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})},
|
||||
{max_batch_bytes,
|
||||
mk(emqx_schema:bytesize(), #{default => "896KB", desc => ?DESC(max_batch_bytes)})},
|
||||
{compression,
|
||||
mk(enum([no_compression, snappy, gzip]), #{
|
||||
default => no_compression, desc => ?DESC(compression)
|
||||
})},
|
||||
{partition_strategy,
|
||||
mk(
|
||||
enum([random, key_dispatch]),
|
||||
#{default => random, desc => ?DESC(partition_strategy)}
|
||||
)},
|
||||
{required_acks,
|
||||
mk(
|
||||
enum([all_isr, leader_only, none]),
|
||||
#{
|
||||
default => all_isr,
|
||||
desc => ?DESC(required_acks)
|
||||
}
|
||||
)},
|
||||
{partition_count_refresh_interval,
|
||||
mk(
|
||||
emqx_schema:duration_s(),
|
||||
#{
|
||||
default => "60s",
|
||||
desc => ?DESC(partition_count_refresh_interval)
|
||||
}
|
||||
)},
|
||||
{max_inflight,
|
||||
mk(
|
||||
pos_integer(),
|
||||
#{
|
||||
default => 10,
|
||||
desc => ?DESC(max_inflight)
|
||||
}
|
||||
)},
|
||||
{buffer,
|
||||
mk(ref(producer_buffer), #{
|
||||
required => false,
|
||||
desc => ?DESC(producer_buffer)
|
||||
})}
|
||||
];
|
||||
fields(kafka_message) ->
|
||||
[
|
||||
{key, mk(string(), #{default => "${clientid}", desc => ?DESC(kafka_message_key)})},
|
||||
{value, mk(string(), #{default => "${payload}", desc => ?DESC(kafka_message_value)})},
|
||||
{timestamp,
|
||||
mk(string(), #{
|
||||
default => "${timestamp}", desc => ?DESC(kafka_message_timestamp)
|
||||
})}
|
||||
];
|
||||
fields(producer_buffer) ->
|
||||
[
|
||||
{mode,
|
||||
mk(
|
||||
enum([memory, disk, hybrid]),
|
||||
#{default => memory, desc => ?DESC(buffer_mode)}
|
||||
)},
|
||||
{per_partition_limit,
|
||||
mk(
|
||||
emqx_schema:bytesize(),
|
||||
#{default => "2GB", desc => ?DESC(buffer_per_partition_limit)}
|
||||
)},
|
||||
{segment_bytes,
|
||||
mk(
|
||||
emqx_schema:bytesize(),
|
||||
#{default => "100MB", desc => ?DESC(buffer_segment_bytes)}
|
||||
)},
|
||||
{memory_overload_protection,
|
||||
mk(boolean(), #{
|
||||
%% different from 4.x
|
||||
default => true,
|
||||
desc => ?DESC(buffer_memory_overload_protection)
|
||||
})}
|
||||
].
|
||||
|
||||
% fields(consumer_opts) ->
|
||||
% [
|
||||
% {kafka, mk(ref(consumer_kafka_opts), #{required => true, desc => ?DESC(consumer_kafka_opts)})},
|
||||
% {mqtt, mk(ref(consumer_mqtt_opts), #{required => true, desc => ?DESC(consumer_mqtt_opts)})}
|
||||
% ];
|
||||
% fields(consumer_mqtt_opts) ->
|
||||
% [ {topic, mk(string(), #{desc => ?DESC(consumer_mqtt_topic)})}
|
||||
% ];
|
||||
|
||||
% fields(consumer_mqtt_opts) ->
|
||||
% [ {topic, mk(string(), #{desc => ?DESC(consumer_mqtt_topic)})}
|
||||
% ];
|
||||
% fields(consumer_kafka_opts) ->
|
||||
% [ {topic, mk(string(), #{desc => ?DESC(consumer_kafka_topic)})}
|
||||
% ].
|
||||
|
||||
desc("config") ->
|
||||
?DESC("desc_config");
|
||||
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
|
||||
["Configuration for Kafka using `", string:to_upper(Method), "` method."];
|
||||
desc(_) ->
|
||||
undefined.
|
||||
|
||||
%% -------------------------------------------------------------------------------------------------
|
||||
%% internal
|
||||
type_field() ->
|
||||
{type, mk(enum([kafka]), #{required => true, desc => ?DESC("desc_type")})}.
|
||||
|
||||
name_field() ->
|
||||
{name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}.
|
||||
|
||||
ref(Name) ->
|
||||
hoconsc:ref(?MODULE, Name).
|
|
@ -0,0 +1,33 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
%% Kafka connection configuration
|
||||
-module(emqx_bridge_impl_kafka).
|
||||
-behaviour(emqx_resource).
|
||||
|
||||
%% callbacks of behaviour emqx_resource
|
||||
-export([
|
||||
callback_mode/0,
|
||||
on_start/2,
|
||||
on_stop/2,
|
||||
on_query/3,
|
||||
on_get_status/2,
|
||||
is_buffer_supported/0
|
||||
]).
|
||||
|
||||
is_buffer_supported() -> true.
|
||||
|
||||
callback_mode() -> async_if_possible.
|
||||
|
||||
on_start(InstId, Config) ->
|
||||
emqx_bridge_impl_kafka_producer:on_start(InstId, Config).
|
||||
|
||||
on_stop(InstId, State) ->
|
||||
emqx_bridge_impl_kafka_producer:on_stop(InstId, State).
|
||||
|
||||
on_query(InstId, Msg, State) ->
|
||||
emqx_bridge_impl_kafka_producer:on_query(InstId, Msg, State).
|
||||
|
||||
on_get_status(InstId, State) ->
|
||||
emqx_bridge_impl_kafka_producer:on_get_status(InstId, State).
|
|
@ -0,0 +1,270 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
-module(emqx_bridge_impl_kafka_producer).
|
||||
|
||||
%% callbacks of behaviour emqx_resource
|
||||
-export([
|
||||
callback_mode/0,
|
||||
on_start/2,
|
||||
on_stop/2,
|
||||
on_query/3,
|
||||
on_get_status/2
|
||||
]).
|
||||
|
||||
-export([on_kafka_ack/3]).
|
||||
|
||||
-include_lib("emqx/include/logger.hrl").
|
||||
|
||||
callback_mode() -> async_if_possible.
|
||||
|
||||
%% @doc Config schema is defined in emqx_ee_bridge_kafka.
|
||||
on_start(InstId, Config) ->
|
||||
#{
|
||||
bridge_name := BridgeName,
|
||||
bootstrap_hosts := Hosts0,
|
||||
connect_timeout := ConnTimeout,
|
||||
metadata_request_timeout := MetaReqTimeout,
|
||||
min_metadata_refresh_interval := MinMetaRefreshInterval,
|
||||
socket_opts := SocketOpts,
|
||||
authentication := Auth,
|
||||
ssl := SSL
|
||||
} = Config,
|
||||
%% it's a bug if producer config is not found
|
||||
%% the caller should not try to start a producer if
|
||||
%% there is no producer config
|
||||
ProducerConfigWrapper = get_required(producer, Config, no_kafka_producer_config),
|
||||
ProducerConfig = get_required(kafka, ProducerConfigWrapper, no_kafka_producer_parameters),
|
||||
MessageTemplate = get_required(message, ProducerConfig, no_kafka_message_template),
|
||||
Hosts = hosts(Hosts0),
|
||||
ClientId = make_client_id(BridgeName),
|
||||
ClientConfig = #{
|
||||
min_metadata_refresh_interval => MinMetaRefreshInterval,
|
||||
connect_timeout => ConnTimeout,
|
||||
client_id => ClientId,
|
||||
request_timeout => MetaReqTimeout,
|
||||
extra_sock_opts => socket_opts(SocketOpts),
|
||||
sasl => sasl(Auth),
|
||||
ssl => ssl(SSL)
|
||||
},
|
||||
#{
|
||||
topic := KafkaTopic
|
||||
} = ProducerConfig,
|
||||
case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of
|
||||
{ok, _} ->
|
||||
?SLOG(info, #{
|
||||
msg => "kafka_client_started",
|
||||
instance_id => InstId,
|
||||
kafka_hosts => Hosts
|
||||
});
|
||||
{error, Reason} ->
|
||||
?SLOG(error, #{
|
||||
msg => "failed_to_start_kafka_client",
|
||||
instance_id => InstId,
|
||||
kafka_hosts => Hosts,
|
||||
reason => Reason
|
||||
}),
|
||||
throw(failed_to_start_kafka_client)
|
||||
end,
|
||||
WolffProducerConfig = producers_config(BridgeName, ClientId, ProducerConfig),
|
||||
case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of
|
||||
{ok, Producers} ->
|
||||
{ok, #{
|
||||
message_template => compile_message_template(MessageTemplate),
|
||||
client_id => ClientId,
|
||||
producers => Producers
|
||||
}};
|
||||
{error, Reason2} ->
|
||||
?SLOG(error, #{
|
||||
msg => "failed_to_start_kafka_producer",
|
||||
instance_id => InstId,
|
||||
kafka_hosts => Hosts,
|
||||
kafka_topic => KafkaTopic,
|
||||
reason => Reason2
|
||||
}),
|
||||
throw(failed_to_start_kafka_producer)
|
||||
end.
|
||||
|
||||
on_stop(_InstId, #{client_id := ClientID, producers := Producers}) ->
|
||||
with_log_at_error(
|
||||
fun() -> wolff:stop_and_delete_supervised_producers(Producers) end,
|
||||
#{
|
||||
msg => "failed_to_delete_kafka_producer",
|
||||
client_id => ClientID
|
||||
}
|
||||
),
|
||||
with_log_at_error(
|
||||
fun() -> wolff:stop_and_delete_supervised_client(ClientID) end,
|
||||
#{
|
||||
msg => "failed_to_delete_kafka_client",
|
||||
client_id => ClientID
|
||||
}
|
||||
).
|
||||
|
||||
%% @doc The callback API for rule-engine (or bridge without rules)
|
||||
%% The input argument `Message' is an enriched format (as a map())
|
||||
%% of the original #message{} record.
|
||||
%% The enrichment is done by rule-engine or by the data bridge framework.
|
||||
%% E.g. the output of rule-engine process chain
|
||||
%% or the direct mapping from an MQTT message.
|
||||
on_query(_InstId, {send_message, Message}, #{message_template := Template, producers := Producers}) ->
|
||||
KafkaMessage = render_message(Template, Message),
|
||||
%% The retuned information is discarded here.
|
||||
%% If the producer process is down when sending, this function would
|
||||
%% raise an error exception which is to be caught by the caller of this callback
|
||||
{_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}),
|
||||
ok.
|
||||
|
||||
compile_message_template(#{
|
||||
key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate
|
||||
}) ->
|
||||
#{
|
||||
key => emqx_plugin_libs_rule:preproc_tmpl(KeyTemplate),
|
||||
value => emqx_plugin_libs_rule:preproc_tmpl(ValueTemplate),
|
||||
timestamp => emqx_plugin_libs_rule:preproc_tmpl(TimestampTemplate)
|
||||
}.
|
||||
|
||||
render_message(
|
||||
#{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message
|
||||
) ->
|
||||
#{
|
||||
key => render(KeyTemplate, Message),
|
||||
value => render(ValueTemplate, Message),
|
||||
ts => render_timestamp(TimestampTemplate, Message)
|
||||
}.
|
||||
|
||||
render(Template, Message) ->
|
||||
emqx_plugin_libs_rule:proc_tmpl(Template, Message).
|
||||
|
||||
render_timestamp(Template, Message) ->
|
||||
try
|
||||
binary_to_integer(render(Template, Message))
|
||||
catch
|
||||
_:_ ->
|
||||
erlang:system_time(millisecond)
|
||||
end.
|
||||
|
||||
on_kafka_ack(_Partition, _Offset, _Extra) ->
|
||||
%% Do nothing so far.
|
||||
%% Maybe need to bump some counters?
|
||||
ok.
|
||||
|
||||
on_get_status(_InstId, _State) ->
|
||||
connected.
|
||||
|
||||
%% Parse comma separated host:port list into a [{Host,Port}] list
|
||||
hosts(Hosts) when is_binary(Hosts) ->
|
||||
hosts(binary_to_list(Hosts));
|
||||
hosts(Hosts) when is_list(Hosts) ->
|
||||
kpro:parse_endpoints(Hosts).
|
||||
|
||||
%% Extra socket options, such as sndbuf size etc.
|
||||
socket_opts(Opts) when is_map(Opts) ->
|
||||
socket_opts(maps:to_list(Opts));
|
||||
socket_opts(Opts) when is_list(Opts) ->
|
||||
socket_opts_loop(Opts, []).
|
||||
|
||||
socket_opts_loop([], Acc) ->
|
||||
lists:reverse(Acc);
|
||||
socket_opts_loop([{T, Bytes} | Rest], Acc) when
|
||||
T =:= sndbuf orelse T =:= recbuf orelse T =:= buffer
|
||||
->
|
||||
Acc1 = [{T, Bytes} | adjust_socket_buffer(Bytes, Acc)],
|
||||
socket_opts_loop(Rest, Acc1);
|
||||
socket_opts_loop([Other | Rest], Acc) ->
|
||||
socket_opts_loop(Rest, [Other | Acc]).
|
||||
|
||||
%% https://www.erlang.org/doc/man/inet.html
|
||||
%% For TCP it is recommended to have val(buffer) >= val(recbuf)
|
||||
%% to avoid performance issues because of unnecessary copying.
|
||||
adjust_socket_buffer(Bytes, Opts) ->
|
||||
case lists:keytake(buffer, 1, Opts) of
|
||||
false ->
|
||||
[{buffer, Bytes} | Opts];
|
||||
{value, {buffer, Bytes1}, Acc1} ->
|
||||
[{buffer, max(Bytes1, Bytes)} | Acc1]
|
||||
end.
|
||||
|
||||
sasl(none) ->
|
||||
undefined;
|
||||
sasl(#{mechanism := Mechanism, username := Username, password := Password}) ->
|
||||
{Mechanism, Username, emqx_secret:wrap(Password)};
|
||||
sasl(#{
|
||||
kerberos_principal := Principal,
|
||||
kerberos_keytab_file := KeyTabFile
|
||||
}) ->
|
||||
{callback, brod_gssapi, {gssapi, KeyTabFile, Principal}}.
|
||||
|
||||
ssl(#{enable := true} = SSL) ->
|
||||
emqx_tls_lib:to_client_opts(SSL);
|
||||
ssl(_) ->
|
||||
[].
|
||||
|
||||
producers_config(BridgeName, ClientId, Input) ->
|
||||
#{
|
||||
max_batch_bytes := MaxBatchBytes,
|
||||
compression := Compression,
|
||||
partition_strategy := PartitionStrategy,
|
||||
required_acks := RequiredAcks,
|
||||
partition_count_refresh_interval := PCntRefreshInterval,
|
||||
max_inflight := MaxInflight,
|
||||
buffer := #{
|
||||
mode := BufferMode,
|
||||
per_partition_limit := PerPartitionLimit,
|
||||
segment_bytes := SegmentBytes,
|
||||
memory_overload_protection := MemOLP
|
||||
}
|
||||
} = Input,
|
||||
|
||||
{OffloadMode, ReplayqDir} =
|
||||
case BufferMode of
|
||||
memory -> {false, false};
|
||||
disk -> {false, replayq_dir(ClientId)};
|
||||
hybrid -> {true, replayq_dir(ClientId)}
|
||||
end,
|
||||
#{
|
||||
name => make_producer_name(BridgeName),
|
||||
partitioner => PartitionStrategy,
|
||||
partition_count_refresh_interval_seconds => PCntRefreshInterval,
|
||||
replayq_dir => ReplayqDir,
|
||||
replayq_offload_mode => OffloadMode,
|
||||
replayq_max_total_bytes => PerPartitionLimit,
|
||||
replayq_seg_bytes => SegmentBytes,
|
||||
drop_if_highmem => MemOLP,
|
||||
required_acks => RequiredAcks,
|
||||
max_batch_bytes => MaxBatchBytes,
|
||||
max_send_ahead => MaxInflight - 1,
|
||||
compression => Compression
|
||||
}.
|
||||
|
||||
replayq_dir(ClientId) ->
|
||||
filename:join([emqx:data_dir(), "kafka", ClientId]).
|
||||
|
||||
%% Client ID is better to be unique to make it easier for Kafka side trouble shooting.
|
||||
make_client_id(BridgeName) when is_atom(BridgeName) ->
|
||||
make_client_id(atom_to_list(BridgeName));
|
||||
make_client_id(BridgeName) ->
|
||||
iolist_to_binary([BridgeName, ":", atom_to_list(node())]).
|
||||
|
||||
%% Producer name must be an atom which will be used as a ETS table name for
|
||||
%% partition worker lookup.
|
||||
make_producer_name(BridgeName) when is_atom(BridgeName) ->
|
||||
make_producer_name(atom_to_list(BridgeName));
|
||||
make_producer_name(BridgeName) ->
|
||||
list_to_atom("kafka_producer_" ++ BridgeName).
|
||||
|
||||
with_log_at_error(Fun, Log) ->
|
||||
try
|
||||
Fun()
|
||||
catch
|
||||
C:E ->
|
||||
?SLOG(error, Log#{
|
||||
exception => C,
|
||||
reason => E
|
||||
})
|
||||
end.
|
||||
|
||||
get_required(Field, Config, Throw) ->
|
||||
Value = maps:get(Field, Config, none),
|
||||
Value =:= none andalso throw(Throw),
|
||||
Value.
|
|
@ -0,0 +1,90 @@
|
|||
%%--------------------------------------------------------------------
|
||||
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||
%%--------------------------------------------------------------------
|
||||
|
||||
-module(emqx_bridge_impl_kafka_producer_SUITE).
|
||||
|
||||
-compile(nowarn_export_all).
|
||||
-compile(export_all).
|
||||
|
||||
-include_lib("eunit/include/eunit.hrl").
|
||||
-include_lib("common_test/include/ct.hrl").
|
||||
-include_lib("brod/include/brod.hrl").
|
||||
|
||||
-define(PRODUCER, emqx_bridge_impl_kafka).
|
||||
|
||||
%%------------------------------------------------------------------------------
|
||||
%% CT boilerplate
|
||||
%%------------------------------------------------------------------------------
|
||||
|
||||
all() ->
|
||||
emqx_common_test_helpers:all(?MODULE).
|
||||
|
||||
init_per_suite(Config) ->
|
||||
{ok, _} = application:ensure_all_started(brod),
|
||||
{ok, _} = application:ensure_all_started(wolff),
|
||||
Config.
|
||||
|
||||
end_per_suite(_) ->
|
||||
ok.
|
||||
|
||||
t_publish(_CtConfig) ->
|
||||
KafkaTopic = "test-topic-one-partition",
|
||||
Conf = config(#{
|
||||
"kafka_hosts_string" => kafka_hosts_string(),
|
||||
"kafka_topic" => KafkaTopic
|
||||
}),
|
||||
InstId = <<"InstanceID">>,
|
||||
Time = erlang:system_time(millisecond),
|
||||
BinTime = integer_to_binary(Time),
|
||||
Msg = #{
|
||||
clientid => BinTime,
|
||||
payload => <<"payload">>,
|
||||
timestamp => Time
|
||||
},
|
||||
{ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0),
|
||||
ct:pal("base offset before testing ~p", [Offset]),
|
||||
{ok, State} = ?PRODUCER:on_start(InstId, Conf),
|
||||
ok = ?PRODUCER:on_query(InstId, {send_message, Msg}, State),
|
||||
{ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset),
|
||||
?assertMatch(#kafka_message{key = BinTime}, KafkaMsg),
|
||||
ok = ?PRODUCER:on_stop(InstId, State),
|
||||
ok.
|
||||
|
||||
config(Args) ->
|
||||
{ok, Conf} = hocon:binary(hocon_config(Args)),
|
||||
#{config := Parsed} = hocon_tconf:check_plain(
|
||||
emqx_ee_bridge_kafka,
|
||||
#{<<"config">> => Conf},
|
||||
#{atom_key => true}
|
||||
),
|
||||
Parsed#{bridge_name => "testbridge"}.
|
||||
|
||||
hocon_config(Args) ->
|
||||
Hocon = bbmustache:render(iolist_to_binary(hocon_config_template()), Args),
|
||||
Hocon.
|
||||
|
||||
%% erlfmt-ignore
|
||||
hocon_config_template() ->
|
||||
"""
|
||||
bootstrap_hosts = \"{{ kafka_hosts_string }}\"
|
||||
enable = true
|
||||
authentication = none
|
||||
producer = {
|
||||
mqtt {
|
||||
topic = \"t/#\"
|
||||
}
|
||||
kafka = {
|
||||
topic = \"{{ kafka_topic }}\"
|
||||
}
|
||||
}
|
||||
""".
|
||||
|
||||
kafka_hosts_string() ->
|
||||
"kafka-1.emqx.net:9092,".
|
||||
|
||||
kafka_hosts() ->
|
||||
kpro:parse_endpoints(kafka_hosts_string()).
|
||||
|
||||
resolve_kafka_offset(Hosts, Topic, Partition) ->
|
||||
brod:resolve_offset(Hosts, Topic, Partition, latest).
|
|
@ -5,7 +5,9 @@
|
|||
kernel,
|
||||
stdlib,
|
||||
hstreamdb_erl,
|
||||
influxdb
|
||||
influxdb,
|
||||
wolff,
|
||||
brod
|
||||
]},
|
||||
{env, []},
|
||||
{modules, []},
|
||||
|
|
12
mix.exs
12
mix.exs
|
@ -44,7 +44,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
# we need several overrides here because dependencies specify
|
||||
# other exact versions, and not ranges.
|
||||
[
|
||||
{:lc, github: "emqx/lc", tag: "0.3.1"},
|
||||
{:lc, github: "emqx/lc", tag: "0.3.2", override: true},
|
||||
{:redbug, "2.0.7"},
|
||||
{:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true},
|
||||
{:ehttpc, github: "emqx/ehttpc", tag: "0.4.0", override: true},
|
||||
|
@ -57,7 +57,7 @@ defmodule EMQXUmbrella.MixProject do
|
|||
{:grpc, github: "emqx/grpc-erl", tag: "0.6.6", override: true},
|
||||
{:minirest, github: "emqx/minirest", tag: "1.3.7", override: true},
|
||||
{:ecpool, github: "emqx/ecpool", tag: "0.5.2", override: true},
|
||||
{:replayq, "0.3.4", override: true},
|
||||
{:replayq, github: "emqx/replayq", tag: "0.3.4", override: true},
|
||||
{:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true},
|
||||
{:emqtt, github: "emqx/emqtt", tag: "1.7.0-rc.1", override: true},
|
||||
{:rulesql, github: "emqx/rulesql", tag: "0.1.4"},
|
||||
|
@ -129,7 +129,13 @@ defmodule EMQXUmbrella.MixProject do
|
|||
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
|
||||
[
|
||||
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"},
|
||||
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.3", override: true}
|
||||
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.3", override: true},
|
||||
{:wolff, github: "kafka4beam/wolff", tag: "1.6.4"},
|
||||
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.0", override: true},
|
||||
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"},
|
||||
{:brod, github: "kafka4beam/brod", tag: "3.16.4"},
|
||||
{:snappyer, "1.2.8", override: true},
|
||||
{:supervisor3, "1.1.11", override: true}
|
||||
]
|
||||
end
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@
|
|||
{post_hooks,[]}.
|
||||
|
||||
{deps,
|
||||
[ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}}
|
||||
[ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}
|
||||
, {redbug, "2.0.7"}
|
||||
, {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps
|
||||
, {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}}
|
||||
|
@ -59,7 +59,7 @@
|
|||
, {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}}
|
||||
, {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}}
|
||||
, {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}}
|
||||
, {replayq, "0.3.4"}
|
||||
, {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.4"}}}
|
||||
, {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}}
|
||||
, {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0-rc.1"}}}
|
||||
, {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}}
|
||||
|
|
|
@ -10,12 +10,20 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.."
|
|||
help() {
|
||||
echo
|
||||
echo "-h|--help: To display this usage info"
|
||||
echo "--app lib_dir/app_name: Print apps in json"
|
||||
echo "--app lib_dir/app_name: For which app to run start docker-compose, and run common tests"
|
||||
echo "--suites SUITE1,SUITE2: Comma separated SUITE names to run. e.g. apps/emqx/test/emqx_SUITE.erl"
|
||||
echo "--console: Start EMQX in console mode"
|
||||
echo "--attach: Attach to the Erlang docker container without running any test case"
|
||||
echo "--only-up: Keep the testbed running after CT"
|
||||
echo "--keep-up: Only start the testbed but do not run CT"
|
||||
}
|
||||
|
||||
WHICH_APP='novalue'
|
||||
CONSOLE='no'
|
||||
KEEP_UP='no'
|
||||
ONLY_UP='no'
|
||||
SUITES=''
|
||||
ATTACH='no'
|
||||
while [ "$#" -gt 0 ]; do
|
||||
case $1 in
|
||||
-h|--help)
|
||||
|
@ -26,10 +34,26 @@ while [ "$#" -gt 0 ]; do
|
|||
WHICH_APP="$2"
|
||||
shift 2
|
||||
;;
|
||||
--only-up)
|
||||
ONLY_UP='yes'
|
||||
shift 1
|
||||
;;
|
||||
--keep-up)
|
||||
KEEP_UP='yes'
|
||||
shift 1
|
||||
;;
|
||||
--attach)
|
||||
ATTACH='yes'
|
||||
shift 1
|
||||
;;
|
||||
--console)
|
||||
CONSOLE='yes'
|
||||
shift 1
|
||||
;;
|
||||
--suites)
|
||||
SUITES="$2"
|
||||
shift 2
|
||||
;;
|
||||
*)
|
||||
echo "unknown option $1"
|
||||
exit 1
|
||||
|
@ -45,6 +69,16 @@ fi
|
|||
ERLANG_CONTAINER='erlang24'
|
||||
DOCKER_CT_ENVS_FILE="${WHICH_APP}/docker-ct"
|
||||
|
||||
case "${WHICH_APP}" in
|
||||
lib-ee*)
|
||||
## ensure enterprise profile when testing lib-ee applications
|
||||
export PROFILE='emqx-enterprise'
|
||||
;;
|
||||
*)
|
||||
true
|
||||
;;
|
||||
esac
|
||||
|
||||
if [ -f "$DOCKER_CT_ENVS_FILE" ]; then
|
||||
# shellcheck disable=SC2002
|
||||
CT_DEPS="$(cat "$DOCKER_CT_ENVS_FILE" | xargs)"
|
||||
|
@ -80,6 +114,9 @@ for dep in ${CT_DEPS}; do
|
|||
FILES+=( '.ci/docker-compose-file/docker-compose-pgsql-tcp.yaml'
|
||||
'.ci/docker-compose-file/docker-compose-pgsql-tls.yaml' )
|
||||
;;
|
||||
kafka)
|
||||
FILES+=( '.ci/docker-compose-file/docker-compose-kafka.yaml' )
|
||||
;;
|
||||
*)
|
||||
echo "unknown_ct_dependency $dep"
|
||||
exit 1
|
||||
|
@ -104,13 +141,23 @@ if [[ -t 1 ]]; then
|
|||
fi
|
||||
docker exec -i $TTY "$ERLANG_CONTAINER" bash -c 'git config --global --add safe.directory /emqx'
|
||||
|
||||
if [ "$CONSOLE" = 'yes' ]; then
|
||||
if [ "$ONLY_UP" = 'yes' ]; then
|
||||
exit 0
|
||||
fi
|
||||
|
||||
if [ "$ATTACH" = 'yes' ]; then
|
||||
docker exec -it "$ERLANG_CONTAINER" bash
|
||||
elif [ "$CONSOLE" = 'yes' ]; then
|
||||
docker exec -i $TTY "$ERLANG_CONTAINER" bash -c "make run"
|
||||
else
|
||||
set +e
|
||||
docker exec -i $TTY "$ERLANG_CONTAINER" bash -c "make ${WHICH_APP}-ct"
|
||||
docker exec -i $TTY -e EMQX_CT_SUITES="$SUITES" "$ERLANG_CONTAINER" bash -c "make ${WHICH_APP}-ct"
|
||||
RESULT=$?
|
||||
if [ "$KEEP_UP" = 'yes' ]; then
|
||||
exit $RESULT
|
||||
else
|
||||
# shellcheck disable=2086 # no quotes for F_OPTIONS
|
||||
docker-compose $F_OPTIONS down
|
||||
exit $RESULT
|
||||
fi
|
||||
fi
|
||||
|
|
|
@ -8,5 +8,9 @@ set -euo pipefail
|
|||
# ensure dir
|
||||
cd -P -- "$(dirname -- "$0")/.."
|
||||
|
||||
TESTDIR="$1/test"
|
||||
find "${TESTDIR}" -name "*_SUITE.erl" -print0 2>/dev/null | xargs -0 | tr ' ' ','
|
||||
if [ -z "${EMQX_CT_SUITES:-}" ]; then
|
||||
TESTDIR="$1/test"
|
||||
find "${TESTDIR}" -name "*_SUITE.erl" -print0 2>/dev/null | xargs -0 | tr ' ' ','
|
||||
else
|
||||
echo "${EMQX_CT_SUITES}"
|
||||
fi
|
||||
|
|
Loading…
Reference in New Issue