diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml new file mode 100644 index 000000000..85532725d --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-kafka.yaml @@ -0,0 +1,27 @@ +version: '3.9' + +services: + zookeeper: + image: wurstmeister/zookeeper + ports: + - "2181:2181" + container_name: zookeeper + hostname: zookeeper + networks: + emqx_bridge: + kafka_1: + image: wurstmeister/kafka:2.13-2.7.0 + ports: + - "9092:9092" + container_name: kafka-1.emqx.net + hostname: kafka-1.emqx.net + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: PLAINTEXT://:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CREATE_TOPICS: test-topic-one-partition:1:1,test-topic-two-partitions:2:1,test-topic-three-partitions:3:1, + networks: + emqx_bridge: diff --git a/.ci/docker-compose-file/docker-compose-python.yaml b/.ci/docker-compose-file/docker-compose-python.yaml index 0b9af4517..14e798c6b 100644 --- a/.ci/docker-compose-file/docker-compose-python.yaml +++ b/.ci/docker-compose-file/docker-compose-python.yaml @@ -2,7 +2,7 @@ version: '3.9' services: python: - container_name: python + container_name: python image: python:3.7.2-alpine3.9 depends_on: - emqx1 diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index ce998656a..24477b21b 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -32,6 +32,7 @@ stop_apps/1, reload/2, app_path/2, + proj_root/0, deps_path/2, flush/0, flush/1 @@ -245,6 +246,14 @@ stop_apps(Apps) -> [application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]], ok. +proj_root() -> + filename:join( + lists:takewhile( + fun(X) -> iolist_to_binary(X) =/= <<"_build">> end, + filename:split(app_path(emqx, ".")) + ) + ). + %% backward compatible deps_path(App, RelativePath) -> app_path(App, RelativePath). diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index d4d24ef3a..ceca5ea7f 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -334,6 +334,8 @@ get_matched_bridges(Topic) -> Bridges ). +%% TODO: refactor to return bridge type, and bridge name directly +%% so there is no need to parse the id back to type and name at where it is used get_matched_bridge_id(_BType, #{enable := false}, _Topic, _BName, Acc) -> Acc; get_matched_bridge_id(BType, #{local_topic := Filter}, Topic, BName, Acc) when diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index f0773a8ea..b6fd2e7be 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -44,7 +44,7 @@ ]). %% bi-directional bridge with producer/consumer or ingress/egress configs --define(IS_BI_DIR_BRIDGE(TYPE), TYPE == <<"mqtt">>; TYPE == <<"kafka">>). +-define(IS_BI_DIR_BRIDGE(TYPE), TYPE =:= <<"mqtt">>; TYPE =:= <<"kafka">>). -if(?EMQX_RELEASE_EDITION == ee). bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; @@ -71,6 +71,8 @@ bridge_id(BridgeType, BridgeName) -> Type = bin(BridgeType), <>. +parse_bridge_id(<<"bridge:", BridgeId/binary>>) -> + parse_bridge_id(BridgeId); parse_bridge_id(BridgeId) -> case string:split(bin(BridgeId), ":", all) of [Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)}; @@ -261,7 +263,7 @@ parse_confs(Type, Name, Conf) when ?IS_BI_DIR_BRIDGE(Type) -> %% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it %% receives a message from the external database. BName = bridge_id(Type, Name), - Conf#{hookpoint => <<"$bridges/", BName/binary>>}; + Conf#{hookpoint => <<"$bridges/", BName/binary>>, bridge_name => Name}; parse_confs(_Type, _Name, Conf) -> Conf. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 309c34195..8086dfa25 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -93,7 +93,8 @@ %% verify if the resource is working normally call_health_check/3, %% stop the instance - call_stop/3 + call_stop/3, + is_buffer_supported/1 ]). %% list all the instances, id only. @@ -117,7 +118,8 @@ on_batch_query/3, on_query_async/4, on_batch_query_async/4, - on_get_status/2 + on_get_status/2, + is_buffer_supported/0 ]). %% when calling emqx_resource:start/1 @@ -155,6 +157,8 @@ | {resource_status(), resource_state()} | {resource_status(), resource_state(), term()}. +-callback is_buffer_supported() -> boolean(). + -spec list_types() -> [module()]. list_types() -> discover_resource_mods(). @@ -256,10 +260,15 @@ query(ResId, Request) -> Result :: term(). query(ResId, Request, Opts) -> case emqx_resource_manager:ets_lookup(ResId) of - {ok, _Group, #{query_mode := QM}} -> - case QM of - sync -> emqx_resource_worker:sync_query(ResId, Request, Opts); - async -> emqx_resource_worker:async_query(ResId, Request, Opts) + {ok, _Group, #{query_mode := QM, mod := Module}} -> + IsBufferSupported = is_buffer_supported(Module), + case {IsBufferSupported, QM} of + {true, _} -> + emqx_resource_worker:simple_sync_query(ResId, Request); + {false, sync} -> + emqx_resource_worker:sync_query(ResId, Request, Opts); + {false, async} -> + emqx_resource_worker:async_query(ResId, Request, Opts) end; {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") @@ -336,6 +345,15 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group). get_callback_mode(Mod) -> Mod:callback_mode(). +-spec is_buffer_supported(module()) -> boolean(). +is_buffer_supported(Module) -> + try + Module:is_buffer_supported() + catch + _:_ -> + false + end. + -spec call_start(manager_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(MgrId, Mod, Config) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index e4ba92b5c..82b565f6f 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -146,14 +146,20 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> ], [matched] ), - ok = emqx_resource_worker_sup:start_workers(ResId, Opts), - case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of + case emqx_resource:is_buffer_supported(ResourceType) of true -> - wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT)); + %% the resource it self supports + %% buffer, so there is no need for resource workers + ok; false -> - ok - end, - ok. + ok = emqx_resource_worker_sup:start_workers(ResId, Opts), + case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of + true -> + wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT)); + false -> + ok + end + end. %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance. %% diff --git a/lib-ee/emqx_ee_bridge/docker-ct b/lib-ee/emqx_ee_bridge/docker-ct index f350a379c..a79037903 100644 --- a/lib-ee/emqx_ee_bridge/docker-ct +++ b/lib-ee/emqx_ee_bridge/docker-ct @@ -1,2 +1,3 @@ mongo mongo_rs_sharded +kafka diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf new file mode 100644 index 000000000..a47bf1249 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf @@ -0,0 +1,471 @@ +emqx_ee_bridge_kafka { + config_enable { + desc { + en: "Enable (true) or disable (false) this Kafka bridge." + zh: "启用(true)或停用该(false)Kafka 数据桥接。" + } + label { + en: "Enable or Disable" + zh: "启用或停用" + } + } + desc_config { + desc { + en: """Configuration for a Kafka bridge.""" + zh: """Kafka 桥接配置""" + } + label { + en: "Kafka Bridge Configuration" + zh: "Kafka 桥接配置" + } + } + desc_type { + desc { + en: """The Bridge Type""" + zh: """桥接类型""" + } + label { + en: "Bridge Type" + zh: "桥接类型" + } + } + desc_name { + desc { + en: """Bridge name, used as a human-readable description of the bridge.""" + zh: """桥接名字,可读描述""" + } + label { + en: "Bridge Name" + zh: "桥接名字" + } + } + producer_opts { + desc { + en: "Local MQTT data source and Kafka bridge configs." + zh: "本地 MQTT 数据源和 Kafka 桥接的配置。" + } + label { + en: "MQTT to Kafka" + zh: "MQTT 到 Kafka" + } + } + producer_mqtt_opts { + desc { + en: "MQTT data source. Optional when used as a rule-engine action." + zh: "需要桥接到 MQTT 源主题。" + } + label { + en: "MQTT Source Topic" + zh: "MQTT 源主题" + } + } + mqtt_topic { + desc { + en: "MQTT topic or topic as data source (bridge input)." + zh: "指定 MQTT 主题作为桥接的数据源" + } + label { + en: "Source MQTT Topic" + zh: "源 MQTT 主题" + } + } + producer_kafka_opts { + desc { + en: "Kafka producer configs." + zh: "Kafka 生产者参数。" + } + label { + en: "Kafka Producer" + zh: "生产者参数" + } + } + bootstrap_hosts { + desc { + en: "A comma separated list of Kafka host:port endpoints to bootstrap the client." + zh: "用逗号分隔的 host:port 主机列表。" + } + label { + en: "Bootstrap Hosts" + zh: "主机列表" + } + } + connect_timeout { + desc { + en: "Maximum wait time for TCP connection establishment (including authentication time if enabled)." + zh: "建立 TCP 连接时的最大等待时长(若启用认证,这个等待时长也包含完成认证所需时间)。" + } + label { + en: "Connect Timeout" + zh: "连接超时" + } + } + min_metadata_refresh_interval { + desc { + en: "Minimum time interval the client has to wait before refreshing Kafka broker and topic metadata. " + "Setting too small value may add extra load on Kafka." + zh: "刷新 Kafka broker 和 Kafka 主题元数据段最短时间间隔。设置太小可能会增加 Kafka 压力。" + } + label { + en: "Min Metadata Refresh Interval" + zh: "元数据刷新最小间隔" + } + } + metadata_request_timeout { + desc { + en: "Maximum wait time when fetching metadata from Kafka." + zh: "刷新元数据时最大等待时长。" + } + label { + en: "Metadata Request Timeout" + zh: "元数据请求超时" + } + } + authentication { + desc { + en: "Authentication configs." + zh: "认证参数。" + } + label { + en: "Authentication" + zh: "认证" + } + } + socket_opts { + desc { + en: "Extra socket options." + zh: "更多 Socket 参数设置。" + } + label { + en: "Socket Options" + zh: "Socket 参数" + } + } + auth_sasl_mechanism { + desc { + en: "SASL authentication mechanism." + zh: "SASL 认证方法名称。" + } + label { + en: "Mechanism" + zh: "认证方法" + } + } + auth_sasl_username { + desc { + en: "SASL authentication username." + zh: "SASL 认证的用户名。" + } + label { + en: "Username" + zh: "用户名" + } + } + auth_sasl_password { + desc { + en: "SASL authentication password." + zh: "SASL 认证的密码。" + } + label { + en: "Password" + zh: "密码" + } + } + auth_kerberos_principal { + desc { + en: "SASL GSSAPI authentication Kerberos principal. " + "For example client_name@MY.KERBEROS.REALM.MYDOMAIN.COM, " + "NOTE: The realm in use has to be configured in /etc/krb5.conf in EMQX nodes." + zh: "SASL GSSAPI 认证方法的 Kerberos principal," + "例如 client_name@MY.KERBEROS.REALM.MYDOMAIN.COM" + "注意:这里使用的 realm 需要配置在 EMQX 服务器的 /etc/krb5.conf 中" + } + label { + en: "Kerberos Principal" + zh: "Kerberos Principal" + } + } + auth_kerberos_keytab_file { + desc { + en: "SASL GSSAPI authentication Kerberos keytab file path. " + "NOTE: This file has to be placed in EMQX nodes, and the EMQX service runner user requires read permission." + zh: "SASL GSSAPI 认证方法的 Kerberos keytab 文件。" + "注意:该文件需要上传到 EMQX 服务器中,且运行 EMQX 服务的系统账户需要有读取权限。" + } + label { + en: "Kerberos keytab file" + zh: "Kerberos keytab 文件" + } + } + socket_send_buffer { + desc { + en: "Fine tune the socket send buffer. The default value is tuned for high throughput." + zh: "TCP socket 的发送缓存调优。默认值是针对高吞吐量的一个推荐值。" + } + label { + en: "Socket Send Buffer Size" + zh: "Socket 发送缓存大小" + } + } + socket_receive_buffer { + desc { + en: "Fine tune the socket receive buffer. The default value is tuned for high throughput." + zh: "TCP socket 的收包缓存调优。默认值是针对高吞吐量的一个推荐值。" + } + label { + en: "Socket Receive Buffer Size" + zh: "Socket 收包缓存大小" + } + } + socket_nodelay { + desc { + en: "When set to 'true', TCP buffer sent as soon as possible. " + "Otherwise the OS kernel may buffer small TCP packets for a while (40ms by default)." + zh: "设置 ‘true' 让系统内核立即发送。否则当需要发送当内容很少时,可能会有一定延迟(默认 40 毫秒)。" + } + label { + en: "No Delay" + zh: "是否延迟发送" + } + } + kafka_topic { + desc { + en: "Kafka topic name" + zh: "Kafka 主题名称" + } + label { + en: "Kafka Topic Name" + zh: "Kafka 主题名称" + } + } + kafka_message { + desc { + en: "Template to render a Kafka message." + zh: "用于生成 Kafka 消息的模版。" + } + label { + en: "Kafka Message Template" + zh: "Kafka 消息模版" + } + } + kafka_message_key { + desc { + en: "Template to render Kafka message key. " + "If the desired variable for this template is not found in the input data " + "NULL is used." + zh: "生成 Kafka 消息 Key 的模版。当所需要的输入没有时,会使用 NULL。" + } + label { + en: "Message Key" + zh: "消息的 Key" + } + } + kafka_message_value { + desc { + en: "Template to render Kafka message value. " + "If the desired variable for this template is not found in the input data " + "NULL is used." + zh: "生成 Kafka 消息 Value 的模版。当所需要的输入没有时,会使用 NULL。" + } + label { + en: "Message Value" + zh: "消息的 Value" + } + } + kafka_message_timestamp { + desc { + en: "Which timestamp to use. " + "The timestamp is expected to be a millisecond precision Unix epoch " + "which can be in string format, e.g. 1661326462115 or " + "'1661326462115'. " + "When the desired data field for this template is not found, " + "or if the found data is not a valid integer, " + "the current system timestamp will be used." + zh: "生成 Kafka 消息时间戳的模版。" + "该时间必需是一个整型数值(可以是字符串格式)例如 1661326462115 " + "或 '1661326462115'。" + "当所需的输入字段不存在,或不是一个整型时," + "则会使用当前系统时间。" + } + label { + en: "Message Timestamp" + zh: "消息的时间戳" + } + } + max_batch_bytes { + desc { + en: "Maximum bytes to collect in a Kafka message batch. " + "Most of the Kafka brokers default to a limit of 1MB batch size. " + "EMQX's default value is less than 1MB in order to compensate " + "Kafka message encoding overheads (especially when each individual message is very small). " + "When a single message is over the limit, it is still sent (as a single element batch)." + zh: "最大消息批量字节数。" + "大多数 Kafka 环境的默认最低值是 1MB,EMQX 的默认值比 1MB 更小是因为需要" + "补偿 Kafka 消息编码索需要的额外字节(尤其是当每条消息都很小的情况下)。" + "当单个消息的大小超过该限制时,它仍然会被发送,(相当于该批量中只有单个消息)。" + } + label { + en: "Max Batch Bytes" + zh: "最大批量字节数" + } + } + compression { + desc { + en: "Compression method." + zh: "压缩方法。" + } + label { + en: "Compression" + zh: "压缩" + } + } + partition_strategy { + desc { + en: "Partition strategy is to tell the producer how to dispatch messages to Kafka partitions.\n\n" + "random: Randomly pick a partition for each message\n" + "key_dispatch: Hash Kafka message key to a partition number\n" + zh: "设置消息发布时应该如何选择 Kafka 分区。\n\n" + "random: 为每个消息随机选择一个分区。\n" + "key_dispatch: Hash Kafka message key to a partition number\n" + } + label { + en: "Partition Strategy" + zh: "分区选择策略" + } + } + required_acks { + desc { + en: "Required acknowledgements for Kafka partition leader to wait for its followers " + "before it sends back the acknowledgement to EMQX Kafka producer\n\n" + "all_isr: Require all in-sync replicas to acknowledge.\n" + "leader_only: Require only the partition-leader's acknowledgement.\n" + "none: No need for Kafka to acknowledge at all.\n" + zh: "设置 Kafka leader 在返回给 EMQX 确认之前需要等待多少个 follower 的确认。\n\n" + "all_isr: 需要所有的在线复制者都确认。\n" + "leader_only: 仅需要分区 leader 确认。\n" + "none: 无需 Kafka 回复任何确认。\n" + } + label { + en: "Required Acks" + zh: "Kafka 确认数量" + } + } + partition_count_refresh_interval { + desc { + en: "The time interval for Kafka producer to discover increased number of partitions.\n" + "After the number of partitions is increased in Kafka, EMQX will start taking the \n" + "discovered partitions into account when dispatching messages per partition_strategy." + zh: "配置 Kafka 刷新分区数量的时间间隔。\n" + "EMQX 发现 Kafka 分区数量增加后,会开始按 partition_strategy 配置,把消息发送到新的分区中。" + } + label { + en: "Partition Count Refresh Interval" + zh: "分区数量刷新间隔" + } + } + max_inflight { + desc { + en: "Maximum number of batches allowed for Kafka producer (per-partition) to send before receiving acknowledgement from Kafka. " + "Greater value typically means better throughput. However, there can be a risk of message reordering when this " + "value is greater than 1." + zh: "设置 Kafka 生产者(每个分区一个)在收到 Kafka 的确认前最多发送多少个请求(批量)。" + "调大这个值通常可以增加吞吐量,但是,当该值设置大于 1 是存在消息乱序的风险。" + } + label { + en: "Max Inflight" + zh: "飞行窗口" + } + } + producer_buffer { + desc { + en: "Configure producer message buffer.\n\n" + "Tell Kafka producer how to buffer messages when EMQX has more messages to send than " + "Kafka can keep up, or when Kafka is down.\n\n" + zh: "配置消息缓存的相关参数。\n\n" + "当 EMQX 需要发送的消息超过 Kafka 处理能力,或者当 Kafka 临时下线时,EMQX 内部会将消息缓存起来。" + } + label { + en: "Message Buffer" + zh: "消息缓存" + } + } + buffer_mode { + desc { + en: "Message buffer mode.\n\n" + "memory: Buffer all messages in memory. The messages will be lost in case of EMQX node restart\n" + "disc: Buffer all messages on disk. The messages on disk are able to survive EMQX node restart.\n" + "hybrid: Buffer message in memory first, when up to certain limit " + "(see segment_bytes config for more information), then start offloading " + "messages to disk, Like memory mode, the messages will be lost in case of " + "EMQX node restart." + zh: "消息缓存模式。\n" + "memory: 所有的消息都缓存在内存里。如果 EMQX 服务重启,缓存的消息会丢失。\n" + "disc: 缓存到磁盘上。EMQX 重启后会继续发送重启前未发送完成的消息。\n" + "hybrid: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制" + "(配置项 segment_bytes 描述了该限制)后,后续的消息会缓存到磁盘上。" + "与 memory 模式一样,如果 EMQX 服务重启,缓存的消息会丢失。" + } + label { + en: "Buffer Mode" + zh: "缓存模式" + } + } + buffer_per_partition_limit { + desc { + en: "Number of bytes allowed to buffer for each Kafka partition. " + "When this limit is exceeded, old messages will be dropped in a trade for credits " + "for new messages to be buffered." + zh: "为每个 Kafka 分区设置的最大缓存字节数。当超过这个上限之后,老的消息会被丢弃," + "为新的消息腾出空间。" + } + label { + en: "Per-partition Buffer Limit" + zh: "Kafka 分区缓存上限" + } + } + buffer_segment_bytes { + desc { + en: "Applicable when buffer mode is set to disk or hybrid.\n" + "This value is to specify the size of each on-disk buffer file." + zh: "当缓存模式是 diskhybrid 时适用。" + "该配置用于指定缓存到磁盘上的文件的大小。" + } + label { + en: "Segment File Bytes" + zh: "缓存文件大小" + } + } + buffer_memory_overload_protection { + desc { + en: "Applicable when buffer mode is set to memory or hybrid.\n" + "EMQX will drop old cached messages under high memory pressure. " + "The high memory threshold is defined in config sysmon.os.sysmem_high_watermark." + zh: "缓存模式是 memoryhybrid 时适用。" + "当系统处于高内存压力时,从队列中丢弃旧的消息以减缓内存增长。" + "内存压力值由配置项 sysmon.os.sysmem_high_watermark 决定。" + } + label { + en: "Memory Overload Protection" + zh: "内存过载保护" + } + } + auth_username_password { + desc { + en: "Username/password based authentication." + zh: "基于用户名密码的认证。" + } + label { + en: "Username/password Auth" + zh: "用户名密码认证" + } + } + auth_gssapi_kerberos { + desc { + en: "Use GSSAPI/Kerberos authentication." + zh: "使用 GSSAPI/Kerberos 认证。" + } + label { + en: "GSSAPI/Kerberos" + zh: "GSSAPI/Kerberos" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index e986d7983..8c79e7274 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -1,5 +1,9 @@ {erl_opts, [debug_info]}. {deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}} + , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.6.4"}}} + , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}} + , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} + , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.4"}}} , {emqx_connector, {path, "../../apps/emqx_connector"}} , {emqx_resource, {path, "../../apps/emqx_resource"}} , {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index a578b7d0d..97c884fe9 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -3,7 +3,8 @@ {registered, []}, {applications, [ kernel, - stdlib + stdlib, + emqx_ee_connector ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 840b963cd..cdf0a6439 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -14,6 +14,7 @@ api_schemas(Method) -> [ + ref(emqx_ee_bridge_kafka, Method), ref(emqx_ee_bridge_mysql, Method), ref(emqx_ee_bridge_mongodb, Method ++ "_rs"), ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"), @@ -26,6 +27,7 @@ api_schemas(Method) -> schema_modules() -> [ + emqx_ee_bridge_kafka, emqx_ee_bridge_hstreamdb, emqx_ee_bridge_influxdb, emqx_ee_bridge_mongodb, @@ -45,6 +47,7 @@ examples(Method) -> lists:foldl(Fun, #{}, schema_modules()). resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); +resource_type(kafka) -> emqx_bridge_impl_kafka; resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb; resource_type(mongodb_rs) -> emqx_connector_mongo; resource_type(mongodb_sharded) -> emqx_connector_mongo; @@ -56,6 +59,11 @@ resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb. fields(bridges) -> [ + {kafka, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_kafka, "config")), + #{desc => <<"EMQX Enterprise Config">>} + )}, {hstreamdb, mk( hoconsc:map(name, ref(emqx_ee_bridge_hstreamdb, "config")), @@ -66,8 +74,9 @@ fields(bridges) -> hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")), #{desc => <<"EMQX Enterprise Config">>} )} - ] ++ fields(mongodb) ++ fields(influxdb); -fields(mongodb) -> + ] ++ mongodb_structs() ++ influxdb_structs(). + +mongodb_structs() -> [ {Type, mk( @@ -75,8 +84,9 @@ fields(mongodb) -> #{desc => <<"EMQX Enterprise Config">>} )} || Type <- [mongodb_rs, mongodb_sharded, mongodb_single] - ]; -fields(influxdb) -> + ]. + +influxdb_structs() -> [ {Protocol, mk( diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl new file mode 100644 index 000000000..080af35d0 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -0,0 +1,260 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_kafka). + +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +%% allow atoms like scram_sha_256 and scram_sha_512 +%% i.e. the _256 part does not start with a-z +-elvis([ + {elvis_style, atom_naming_convention, #{ + regex => "^([a-z][a-z0-9]*_?)([a-z0-9]*_?)*$", + enclosed_atoms => ".*" + }} +]). +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_examples(Method) -> + [ + #{ + <<"kafka">> => #{ + summary => <<"Kafka Bridge">>, + value => values(Method) + } + } + ]. + +values(get) -> + maps:merge(values(post), ?METRICS_EXAMPLE); +values(post) -> + #{ + bootstrap_hosts => <<"localhost:9092">> + }; +values(put) -> + values(post). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions + +namespace() -> "bridge_kafka". + +roots() -> ["config"]. + +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:metrics_status_fields() ++ fields("post"); +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {bootstrap_hosts, mk(binary(), #{required => true, desc => ?DESC(bootstrap_hosts)})}, + {connect_timeout, + mk(emqx_schema:duration_ms(), #{ + default => "5s", + desc => ?DESC(connect_timeout) + })}, + {min_metadata_refresh_interval, + mk( + emqx_schema:duration_ms(), + #{ + default => "3s", + desc => ?DESC(min_metadata_refresh_interval) + } + )}, + {metadata_request_timeout, + mk(emqx_schema:duration_ms(), #{ + default => "5s", + desc => ?DESC(metadata_request_timeout) + })}, + {authentication, + mk(hoconsc:union([none, ref(auth_username_password), ref(auth_gssapi_kerberos)]), #{ + default => none, desc => ?DESC("authentication") + })}, + {producer, mk(hoconsc:union([none, ref(producer_opts)]), #{desc => ?DESC(producer_opts)})}, + %{consumer, mk(hoconsc:union([none, ref(consumer_opts)]), #{desc => ?DESC(consumer_opts)})}, + {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})} + ] ++ emqx_connector_schema_lib:ssl_fields(); +fields(auth_username_password) -> + [ + {mechanism, + mk(enum([plain, scram_sha_256, scram_sha_512]), #{ + required => true, desc => ?DESC(auth_sasl_mechanism) + })}, + {username, mk(binary(), #{required => true, desc => ?DESC(auth_sasl_username)})}, + {password, + mk(binary(), #{required => true, sensitive => true, desc => ?DESC(auth_sasl_password)})} + ]; +fields(auth_gssapi_kerberos) -> + [ + {kerberos_principal, + mk(binary(), #{ + required => true, + desc => ?DESC(auth_kerberos_principal) + })}, + {kerberos_keytab_file, + mk(binary(), #{ + required => true, + desc => ?DESC(auth_kerberos_keytab_file) + })} + ]; +fields(socket_opts) -> + [ + {sndbuf, + mk( + emqx_schema:bytesize(), + #{default => "1024KB", desc => ?DESC(socket_send_buffer)} + )}, + {recbuf, + mk( + emqx_schema:bytesize(), + #{default => "1024KB", desc => ?DESC(socket_receive_buffer)} + )}, + {nodelay, + mk( + boolean(), + #{default => true, desc => ?DESC(socket_nodelay)} + )} + ]; +fields(producer_opts) -> + [ + {mqtt, mk(ref(producer_mqtt_opts), #{desc => ?DESC(producer_mqtt_opts)})}, + {kafka, + mk(ref(producer_kafka_opts), #{ + required => true, + desc => ?DESC(producer_kafka_opts) + })} + ]; +fields(producer_mqtt_opts) -> + [{topic, mk(string(), #{desc => ?DESC(mqtt_topic)})}]; +fields(producer_kafka_opts) -> + [ + {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, + {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})}, + {max_batch_bytes, + mk(emqx_schema:bytesize(), #{default => "896KB", desc => ?DESC(max_batch_bytes)})}, + {compression, + mk(enum([no_compression, snappy, gzip]), #{ + default => no_compression, desc => ?DESC(compression) + })}, + {partition_strategy, + mk( + enum([random, key_dispatch]), + #{default => random, desc => ?DESC(partition_strategy)} + )}, + {required_acks, + mk( + enum([all_isr, leader_only, none]), + #{ + default => all_isr, + desc => ?DESC(required_acks) + } + )}, + {partition_count_refresh_interval, + mk( + emqx_schema:duration_s(), + #{ + default => "60s", + desc => ?DESC(partition_count_refresh_interval) + } + )}, + {max_inflight, + mk( + pos_integer(), + #{ + default => 10, + desc => ?DESC(max_inflight) + } + )}, + {buffer, + mk(ref(producer_buffer), #{ + required => false, + desc => ?DESC(producer_buffer) + })} + ]; +fields(kafka_message) -> + [ + {key, mk(string(), #{default => "${clientid}", desc => ?DESC(kafka_message_key)})}, + {value, mk(string(), #{default => "${payload}", desc => ?DESC(kafka_message_value)})}, + {timestamp, + mk(string(), #{ + default => "${timestamp}", desc => ?DESC(kafka_message_timestamp) + })} + ]; +fields(producer_buffer) -> + [ + {mode, + mk( + enum([memory, disk, hybrid]), + #{default => memory, desc => ?DESC(buffer_mode)} + )}, + {per_partition_limit, + mk( + emqx_schema:bytesize(), + #{default => "2GB", desc => ?DESC(buffer_per_partition_limit)} + )}, + {segment_bytes, + mk( + emqx_schema:bytesize(), + #{default => "100MB", desc => ?DESC(buffer_segment_bytes)} + )}, + {memory_overload_protection, + mk(boolean(), #{ + %% different from 4.x + default => true, + desc => ?DESC(buffer_memory_overload_protection) + })} + ]. + +% fields(consumer_opts) -> +% [ +% {kafka, mk(ref(consumer_kafka_opts), #{required => true, desc => ?DESC(consumer_kafka_opts)})}, +% {mqtt, mk(ref(consumer_mqtt_opts), #{required => true, desc => ?DESC(consumer_mqtt_opts)})} +% ]; +% fields(consumer_mqtt_opts) -> +% [ {topic, mk(string(), #{desc => ?DESC(consumer_mqtt_topic)})} +% ]; + +% fields(consumer_mqtt_opts) -> +% [ {topic, mk(string(), #{desc => ?DESC(consumer_mqtt_topic)})} +% ]; +% fields(consumer_kafka_opts) -> +% [ {topic, mk(string(), #{desc => ?DESC(consumer_kafka_topic)})} +% ]. + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Kafka using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- +%% internal +type_field() -> + {type, mk(enum([kafka]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. + +ref(Name) -> + hoconsc:ref(?MODULE, Name). diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl new file mode 100644 index 000000000..d1fad4765 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% Kafka connection configuration +-module(emqx_bridge_impl_kafka). +-behaviour(emqx_resource). + +%% callbacks of behaviour emqx_resource +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_get_status/2, + is_buffer_supported/0 +]). + +is_buffer_supported() -> true. + +callback_mode() -> async_if_possible. + +on_start(InstId, Config) -> + emqx_bridge_impl_kafka_producer:on_start(InstId, Config). + +on_stop(InstId, State) -> + emqx_bridge_impl_kafka_producer:on_stop(InstId, State). + +on_query(InstId, Msg, State) -> + emqx_bridge_impl_kafka_producer:on_query(InstId, Msg, State). + +on_get_status(InstId, State) -> + emqx_bridge_impl_kafka_producer:on_get_status(InstId, State). diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl new file mode 100644 index 000000000..ce82dbe2d --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -0,0 +1,270 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_impl_kafka_producer). + +%% callbacks of behaviour emqx_resource +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_get_status/2 +]). + +-export([on_kafka_ack/3]). + +-include_lib("emqx/include/logger.hrl"). + +callback_mode() -> async_if_possible. + +%% @doc Config schema is defined in emqx_ee_bridge_kafka. +on_start(InstId, Config) -> + #{ + bridge_name := BridgeName, + bootstrap_hosts := Hosts0, + connect_timeout := ConnTimeout, + metadata_request_timeout := MetaReqTimeout, + min_metadata_refresh_interval := MinMetaRefreshInterval, + socket_opts := SocketOpts, + authentication := Auth, + ssl := SSL + } = Config, + %% it's a bug if producer config is not found + %% the caller should not try to start a producer if + %% there is no producer config + ProducerConfigWrapper = get_required(producer, Config, no_kafka_producer_config), + ProducerConfig = get_required(kafka, ProducerConfigWrapper, no_kafka_producer_parameters), + MessageTemplate = get_required(message, ProducerConfig, no_kafka_message_template), + Hosts = hosts(Hosts0), + ClientId = make_client_id(BridgeName), + ClientConfig = #{ + min_metadata_refresh_interval => MinMetaRefreshInterval, + connect_timeout => ConnTimeout, + client_id => ClientId, + request_timeout => MetaReqTimeout, + extra_sock_opts => socket_opts(SocketOpts), + sasl => sasl(Auth), + ssl => ssl(SSL) + }, + #{ + topic := KafkaTopic + } = ProducerConfig, + case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of + {ok, _} -> + ?SLOG(info, #{ + msg => "kafka_client_started", + instance_id => InstId, + kafka_hosts => Hosts + }); + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_start_kafka_client", + instance_id => InstId, + kafka_hosts => Hosts, + reason => Reason + }), + throw(failed_to_start_kafka_client) + end, + WolffProducerConfig = producers_config(BridgeName, ClientId, ProducerConfig), + case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of + {ok, Producers} -> + {ok, #{ + message_template => compile_message_template(MessageTemplate), + client_id => ClientId, + producers => Producers + }}; + {error, Reason2} -> + ?SLOG(error, #{ + msg => "failed_to_start_kafka_producer", + instance_id => InstId, + kafka_hosts => Hosts, + kafka_topic => KafkaTopic, + reason => Reason2 + }), + throw(failed_to_start_kafka_producer) + end. + +on_stop(_InstId, #{client_id := ClientID, producers := Producers}) -> + with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, + #{ + msg => "failed_to_delete_kafka_producer", + client_id => ClientID + } + ), + with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_client(ClientID) end, + #{ + msg => "failed_to_delete_kafka_client", + client_id => ClientID + } + ). + +%% @doc The callback API for rule-engine (or bridge without rules) +%% The input argument `Message' is an enriched format (as a map()) +%% of the original #message{} record. +%% The enrichment is done by rule-engine or by the data bridge framework. +%% E.g. the output of rule-engine process chain +%% or the direct mapping from an MQTT message. +on_query(_InstId, {send_message, Message}, #{message_template := Template, producers := Producers}) -> + KafkaMessage = render_message(Template, Message), + %% The retuned information is discarded here. + %% If the producer process is down when sending, this function would + %% raise an error exception which is to be caught by the caller of this callback + {_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}), + ok. + +compile_message_template(#{ + key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate +}) -> + #{ + key => emqx_plugin_libs_rule:preproc_tmpl(KeyTemplate), + value => emqx_plugin_libs_rule:preproc_tmpl(ValueTemplate), + timestamp => emqx_plugin_libs_rule:preproc_tmpl(TimestampTemplate) + }. + +render_message( + #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message +) -> + #{ + key => render(KeyTemplate, Message), + value => render(ValueTemplate, Message), + ts => render_timestamp(TimestampTemplate, Message) + }. + +render(Template, Message) -> + emqx_plugin_libs_rule:proc_tmpl(Template, Message). + +render_timestamp(Template, Message) -> + try + binary_to_integer(render(Template, Message)) + catch + _:_ -> + erlang:system_time(millisecond) + end. + +on_kafka_ack(_Partition, _Offset, _Extra) -> + %% Do nothing so far. + %% Maybe need to bump some counters? + ok. + +on_get_status(_InstId, _State) -> + connected. + +%% Parse comma separated host:port list into a [{Host,Port}] list +hosts(Hosts) when is_binary(Hosts) -> + hosts(binary_to_list(Hosts)); +hosts(Hosts) when is_list(Hosts) -> + kpro:parse_endpoints(Hosts). + +%% Extra socket options, such as sndbuf size etc. +socket_opts(Opts) when is_map(Opts) -> + socket_opts(maps:to_list(Opts)); +socket_opts(Opts) when is_list(Opts) -> + socket_opts_loop(Opts, []). + +socket_opts_loop([], Acc) -> + lists:reverse(Acc); +socket_opts_loop([{T, Bytes} | Rest], Acc) when + T =:= sndbuf orelse T =:= recbuf orelse T =:= buffer +-> + Acc1 = [{T, Bytes} | adjust_socket_buffer(Bytes, Acc)], + socket_opts_loop(Rest, Acc1); +socket_opts_loop([Other | Rest], Acc) -> + socket_opts_loop(Rest, [Other | Acc]). + +%% https://www.erlang.org/doc/man/inet.html +%% For TCP it is recommended to have val(buffer) >= val(recbuf) +%% to avoid performance issues because of unnecessary copying. +adjust_socket_buffer(Bytes, Opts) -> + case lists:keytake(buffer, 1, Opts) of + false -> + [{buffer, Bytes} | Opts]; + {value, {buffer, Bytes1}, Acc1} -> + [{buffer, max(Bytes1, Bytes)} | Acc1] + end. + +sasl(none) -> + undefined; +sasl(#{mechanism := Mechanism, username := Username, password := Password}) -> + {Mechanism, Username, emqx_secret:wrap(Password)}; +sasl(#{ + kerberos_principal := Principal, + kerberos_keytab_file := KeyTabFile +}) -> + {callback, brod_gssapi, {gssapi, KeyTabFile, Principal}}. + +ssl(#{enable := true} = SSL) -> + emqx_tls_lib:to_client_opts(SSL); +ssl(_) -> + []. + +producers_config(BridgeName, ClientId, Input) -> + #{ + max_batch_bytes := MaxBatchBytes, + compression := Compression, + partition_strategy := PartitionStrategy, + required_acks := RequiredAcks, + partition_count_refresh_interval := PCntRefreshInterval, + max_inflight := MaxInflight, + buffer := #{ + mode := BufferMode, + per_partition_limit := PerPartitionLimit, + segment_bytes := SegmentBytes, + memory_overload_protection := MemOLP + } + } = Input, + + {OffloadMode, ReplayqDir} = + case BufferMode of + memory -> {false, false}; + disk -> {false, replayq_dir(ClientId)}; + hybrid -> {true, replayq_dir(ClientId)} + end, + #{ + name => make_producer_name(BridgeName), + partitioner => PartitionStrategy, + partition_count_refresh_interval_seconds => PCntRefreshInterval, + replayq_dir => ReplayqDir, + replayq_offload_mode => OffloadMode, + replayq_max_total_bytes => PerPartitionLimit, + replayq_seg_bytes => SegmentBytes, + drop_if_highmem => MemOLP, + required_acks => RequiredAcks, + max_batch_bytes => MaxBatchBytes, + max_send_ahead => MaxInflight - 1, + compression => Compression + }. + +replayq_dir(ClientId) -> + filename:join([emqx:data_dir(), "kafka", ClientId]). + +%% Client ID is better to be unique to make it easier for Kafka side trouble shooting. +make_client_id(BridgeName) when is_atom(BridgeName) -> + make_client_id(atom_to_list(BridgeName)); +make_client_id(BridgeName) -> + iolist_to_binary([BridgeName, ":", atom_to_list(node())]). + +%% Producer name must be an atom which will be used as a ETS table name for +%% partition worker lookup. +make_producer_name(BridgeName) when is_atom(BridgeName) -> + make_producer_name(atom_to_list(BridgeName)); +make_producer_name(BridgeName) -> + list_to_atom("kafka_producer_" ++ BridgeName). + +with_log_at_error(Fun, Log) -> + try + Fun() + catch + C:E -> + ?SLOG(error, Log#{ + exception => C, + reason => E + }) + end. + +get_required(Field, Config, Throw) -> + Value = maps:get(Field, Config, none), + Value =:= none andalso throw(Throw), + Value. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl new file mode 100644 index 000000000..8d01d0d69 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -0,0 +1,90 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_impl_kafka_producer_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("brod/include/brod.hrl"). + +-define(PRODUCER, emqx_bridge_impl_kafka). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(brod), + {ok, _} = application:ensure_all_started(wolff), + Config. + +end_per_suite(_) -> + ok. + +t_publish(_CtConfig) -> + KafkaTopic = "test-topic-one-partition", + Conf = config(#{ + "kafka_hosts_string" => kafka_hosts_string(), + "kafka_topic" => KafkaTopic + }), + InstId = <<"InstanceID">>, + Time = erlang:system_time(millisecond), + BinTime = integer_to_binary(Time), + Msg = #{ + clientid => BinTime, + payload => <<"payload">>, + timestamp => Time + }, + {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), + ct:pal("base offset before testing ~p", [Offset]), + {ok, State} = ?PRODUCER:on_start(InstId, Conf), + ok = ?PRODUCER:on_query(InstId, {send_message, Msg}, State), + {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), + ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), + ok = ?PRODUCER:on_stop(InstId, State), + ok. + +config(Args) -> + {ok, Conf} = hocon:binary(hocon_config(Args)), + #{config := Parsed} = hocon_tconf:check_plain( + emqx_ee_bridge_kafka, + #{<<"config">> => Conf}, + #{atom_key => true} + ), + Parsed#{bridge_name => "testbridge"}. + +hocon_config(Args) -> + Hocon = bbmustache:render(iolist_to_binary(hocon_config_template()), Args), + Hocon. + +%% erlfmt-ignore +hocon_config_template() -> +""" +bootstrap_hosts = \"{{ kafka_hosts_string }}\" +enable = true +authentication = none +producer = { + mqtt { + topic = \"t/#\" + } + kafka = { + topic = \"{{ kafka_topic }}\" + } +} +""". + +kafka_hosts_string() -> + "kafka-1.emqx.net:9092,". + +kafka_hosts() -> + kpro:parse_endpoints(kafka_hosts_string()). + +resolve_kafka_offset(Hosts, Topic, Partition) -> + brod:resolve_offset(Hosts, Topic, Partition, latest). diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 675a934aa..c1b86d20b 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -5,7 +5,9 @@ kernel, stdlib, hstreamdb_erl, - influxdb + influxdb, + wolff, + brod ]}, {env, []}, {modules, []}, diff --git a/mix.exs b/mix.exs index d871ddf82..2b7f1419b 100644 --- a/mix.exs +++ b/mix.exs @@ -44,7 +44,7 @@ defmodule EMQXUmbrella.MixProject do # we need several overrides here because dependencies specify # other exact versions, and not ranges. [ - {:lc, github: "emqx/lc", tag: "0.3.1"}, + {:lc, github: "emqx/lc", tag: "0.3.2", override: true}, {:redbug, "2.0.7"}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, {:ehttpc, github: "emqx/ehttpc", tag: "0.4.0", override: true}, @@ -57,7 +57,7 @@ defmodule EMQXUmbrella.MixProject do {:grpc, github: "emqx/grpc-erl", tag: "0.6.6", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.7", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.2", override: true}, - {:replayq, "0.3.4", override: true}, + {:replayq, github: "emqx/replayq", tag: "0.3.4", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, {:emqtt, github: "emqx/emqtt", tag: "1.7.0-rc.1", override: true}, {:rulesql, github: "emqx/rulesql", tag: "0.1.4"}, @@ -129,7 +129,13 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, - {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.3", override: true} + {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.3", override: true}, + {:wolff, github: "kafka4beam/wolff", tag: "1.6.4"}, + {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.0", override: true}, + {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"}, + {:brod, github: "kafka4beam/brod", tag: "3.16.4"}, + {:snappyer, "1.2.8", override: true}, + {:supervisor3, "1.1.11", override: true} ] end diff --git a/rebar.config b/rebar.config index e29c5f2c7..e5bab7c6d 100644 --- a/rebar.config +++ b/rebar.config @@ -44,7 +44,7 @@ {post_hooks,[]}. {deps, - [ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}} + [ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}} , {redbug, "2.0.7"} , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} @@ -59,7 +59,7 @@ , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} - , {replayq, "0.3.4"} + , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.4"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0-rc.1"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}} diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 2c87bb0cf..c478ce005 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -10,12 +10,20 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.." help() { echo echo "-h|--help: To display this usage info" - echo "--app lib_dir/app_name: Print apps in json" + echo "--app lib_dir/app_name: For which app to run start docker-compose, and run common tests" + echo "--suites SUITE1,SUITE2: Comma separated SUITE names to run. e.g. apps/emqx/test/emqx_SUITE.erl" echo "--console: Start EMQX in console mode" + echo "--attach: Attach to the Erlang docker container without running any test case" + echo "--only-up: Keep the testbed running after CT" + echo "--keep-up: Only start the testbed but do not run CT" } WHICH_APP='novalue' CONSOLE='no' +KEEP_UP='no' +ONLY_UP='no' +SUITES='' +ATTACH='no' while [ "$#" -gt 0 ]; do case $1 in -h|--help) @@ -26,10 +34,26 @@ while [ "$#" -gt 0 ]; do WHICH_APP="$2" shift 2 ;; + --only-up) + ONLY_UP='yes' + shift 1 + ;; + --keep-up) + KEEP_UP='yes' + shift 1 + ;; + --attach) + ATTACH='yes' + shift 1 + ;; --console) CONSOLE='yes' shift 1 ;; + --suites) + SUITES="$2" + shift 2 + ;; *) echo "unknown option $1" exit 1 @@ -45,6 +69,16 @@ fi ERLANG_CONTAINER='erlang24' DOCKER_CT_ENVS_FILE="${WHICH_APP}/docker-ct" +case "${WHICH_APP}" in + lib-ee*) + ## ensure enterprise profile when testing lib-ee applications + export PROFILE='emqx-enterprise' + ;; + *) + true + ;; +esac + if [ -f "$DOCKER_CT_ENVS_FILE" ]; then # shellcheck disable=SC2002 CT_DEPS="$(cat "$DOCKER_CT_ENVS_FILE" | xargs)" @@ -80,6 +114,9 @@ for dep in ${CT_DEPS}; do FILES+=( '.ci/docker-compose-file/docker-compose-pgsql-tcp.yaml' '.ci/docker-compose-file/docker-compose-pgsql-tls.yaml' ) ;; + kafka) + FILES+=( '.ci/docker-compose-file/docker-compose-kafka.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1 @@ -104,13 +141,23 @@ if [[ -t 1 ]]; then fi docker exec -i $TTY "$ERLANG_CONTAINER" bash -c 'git config --global --add safe.directory /emqx' -if [ "$CONSOLE" = 'yes' ]; then +if [ "$ONLY_UP" = 'yes' ]; then + exit 0 +fi + +if [ "$ATTACH" = 'yes' ]; then + docker exec -it "$ERLANG_CONTAINER" bash +elif [ "$CONSOLE" = 'yes' ]; then docker exec -i $TTY "$ERLANG_CONTAINER" bash -c "make run" else set +e - docker exec -i $TTY "$ERLANG_CONTAINER" bash -c "make ${WHICH_APP}-ct" + docker exec -i $TTY -e EMQX_CT_SUITES="$SUITES" "$ERLANG_CONTAINER" bash -c "make ${WHICH_APP}-ct" RESULT=$? - # shellcheck disable=2086 # no quotes for F_OPTIONS - docker-compose $F_OPTIONS down - exit $RESULT + if [ "$KEEP_UP" = 'yes' ]; then + exit $RESULT + else + # shellcheck disable=2086 # no quotes for F_OPTIONS + docker-compose $F_OPTIONS down + exit $RESULT + fi fi diff --git a/scripts/find-suites.sh b/scripts/find-suites.sh index 4d2fd3bee..e7c1b422e 100755 --- a/scripts/find-suites.sh +++ b/scripts/find-suites.sh @@ -8,5 +8,9 @@ set -euo pipefail # ensure dir cd -P -- "$(dirname -- "$0")/.." -TESTDIR="$1/test" -find "${TESTDIR}" -name "*_SUITE.erl" -print0 2>/dev/null | xargs -0 | tr ' ' ',' +if [ -z "${EMQX_CT_SUITES:-}" ]; then + TESTDIR="$1/test" + find "${TESTDIR}" -name "*_SUITE.erl" -print0 2>/dev/null | xargs -0 | tr ' ' ',' +else + echo "${EMQX_CT_SUITES}" +fi