From 0c1595be0275b43c3686c0bd6dcc123a8979e473 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 12 Sep 2022 10:51:35 +0200 Subject: [PATCH 01/17] feat: Add Kafka connector --- .../docker-compose-kafka.yaml | 27 + .../docker-compose-python.yaml | 2 +- apps/emqx/test/emqx_common_test_helpers.erl | 9 + apps/emqx_bridge/src/emqx_bridge.erl | 2 + apps/emqx_bridge/src/emqx_bridge_resource.erl | 6 +- apps/emqx_resource/src/emqx_resource.erl | 30 +- .../src/emqx_resource_manager.erl | 18 +- lib-ee/emqx_ee_bridge/docker-ct | 1 + .../i18n/emqx_ee_bridge_kafka.conf | 471 ++++++++++++++++++ lib-ee/emqx_ee_bridge/rebar.config | 4 + .../emqx_ee_bridge/src/emqx_ee_bridge.app.src | 3 +- lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl | 18 +- .../src/emqx_ee_bridge_kafka.erl | 260 ++++++++++ .../src/kafka/emqx_bridge_impl_kafka.erl | 33 ++ .../kafka/emqx_bridge_impl_kafka_producer.erl | 270 ++++++++++ .../emqx_bridge_impl_kafka_producer_SUITE.erl | 90 ++++ .../src/emqx_ee_connector.app.src | 4 +- mix.exs | 12 +- rebar.config | 4 +- scripts/ct/run.sh | 59 ++- scripts/find-suites.sh | 8 +- 21 files changed, 1297 insertions(+), 34 deletions(-) create mode 100644 .ci/docker-compose-file/docker-compose-kafka.yaml create mode 100644 lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf create mode 100644 lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl create mode 100644 lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl create mode 100644 lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl create mode 100644 lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml new file mode 100644 index 000000000..85532725d --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-kafka.yaml @@ -0,0 +1,27 @@ +version: '3.9' + +services: + zookeeper: + image: wurstmeister/zookeeper + ports: + - "2181:2181" + container_name: zookeeper + hostname: zookeeper + networks: + emqx_bridge: + kafka_1: + image: wurstmeister/kafka:2.13-2.7.0 + ports: + - "9092:9092" + container_name: kafka-1.emqx.net + hostname: kafka-1.emqx.net + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: PLAINTEXT://:9092 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_CREATE_TOPICS: test-topic-one-partition:1:1,test-topic-two-partitions:2:1,test-topic-three-partitions:3:1, + networks: + emqx_bridge: diff --git a/.ci/docker-compose-file/docker-compose-python.yaml b/.ci/docker-compose-file/docker-compose-python.yaml index 0b9af4517..14e798c6b 100644 --- a/.ci/docker-compose-file/docker-compose-python.yaml +++ b/.ci/docker-compose-file/docker-compose-python.yaml @@ -2,7 +2,7 @@ version: '3.9' services: python: - container_name: python + container_name: python image: python:3.7.2-alpine3.9 depends_on: - emqx1 diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index ce998656a..24477b21b 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -32,6 +32,7 @@ stop_apps/1, reload/2, app_path/2, + proj_root/0, deps_path/2, flush/0, flush/1 @@ -245,6 +246,14 @@ stop_apps(Apps) -> [application:stop(App) || App <- Apps ++ [emqx, ekka, mria, mnesia]], ok. +proj_root() -> + filename:join( + lists:takewhile( + fun(X) -> iolist_to_binary(X) =/= <<"_build">> end, + filename:split(app_path(emqx, ".")) + ) + ). + %% backward compatible deps_path(App, RelativePath) -> app_path(App, RelativePath). diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index d4d24ef3a..ceca5ea7f 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -334,6 +334,8 @@ get_matched_bridges(Topic) -> Bridges ). +%% TODO: refactor to return bridge type, and bridge name directly +%% so there is no need to parse the id back to type and name at where it is used get_matched_bridge_id(_BType, #{enable := false}, _Topic, _BName, Acc) -> Acc; get_matched_bridge_id(BType, #{local_topic := Filter}, Topic, BName, Acc) when diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index f0773a8ea..b6fd2e7be 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -44,7 +44,7 @@ ]). %% bi-directional bridge with producer/consumer or ingress/egress configs --define(IS_BI_DIR_BRIDGE(TYPE), TYPE == <<"mqtt">>; TYPE == <<"kafka">>). +-define(IS_BI_DIR_BRIDGE(TYPE), TYPE =:= <<"mqtt">>; TYPE =:= <<"kafka">>). -if(?EMQX_RELEASE_EDITION == ee). bridge_to_resource_type(<<"mqtt">>) -> emqx_connector_mqtt; @@ -71,6 +71,8 @@ bridge_id(BridgeType, BridgeName) -> Type = bin(BridgeType), <>. +parse_bridge_id(<<"bridge:", BridgeId/binary>>) -> + parse_bridge_id(BridgeId); parse_bridge_id(BridgeId) -> case string:split(bin(BridgeId), ":", all) of [Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)}; @@ -261,7 +263,7 @@ parse_confs(Type, Name, Conf) when ?IS_BI_DIR_BRIDGE(Type) -> %% hookpoint. The underlying driver will run `emqx_hooks:run/3` when it %% receives a message from the external database. BName = bridge_id(Type, Name), - Conf#{hookpoint => <<"$bridges/", BName/binary>>}; + Conf#{hookpoint => <<"$bridges/", BName/binary>>, bridge_name => Name}; parse_confs(_Type, _Name, Conf) -> Conf. diff --git a/apps/emqx_resource/src/emqx_resource.erl b/apps/emqx_resource/src/emqx_resource.erl index 309c34195..8086dfa25 100644 --- a/apps/emqx_resource/src/emqx_resource.erl +++ b/apps/emqx_resource/src/emqx_resource.erl @@ -93,7 +93,8 @@ %% verify if the resource is working normally call_health_check/3, %% stop the instance - call_stop/3 + call_stop/3, + is_buffer_supported/1 ]). %% list all the instances, id only. @@ -117,7 +118,8 @@ on_batch_query/3, on_query_async/4, on_batch_query_async/4, - on_get_status/2 + on_get_status/2, + is_buffer_supported/0 ]). %% when calling emqx_resource:start/1 @@ -155,6 +157,8 @@ | {resource_status(), resource_state()} | {resource_status(), resource_state(), term()}. +-callback is_buffer_supported() -> boolean(). + -spec list_types() -> [module()]. list_types() -> discover_resource_mods(). @@ -256,10 +260,15 @@ query(ResId, Request) -> Result :: term(). query(ResId, Request, Opts) -> case emqx_resource_manager:ets_lookup(ResId) of - {ok, _Group, #{query_mode := QM}} -> - case QM of - sync -> emqx_resource_worker:sync_query(ResId, Request, Opts); - async -> emqx_resource_worker:async_query(ResId, Request, Opts) + {ok, _Group, #{query_mode := QM, mod := Module}} -> + IsBufferSupported = is_buffer_supported(Module), + case {IsBufferSupported, QM} of + {true, _} -> + emqx_resource_worker:simple_sync_query(ResId, Request); + {false, sync} -> + emqx_resource_worker:sync_query(ResId, Request, Opts); + {false, async} -> + emqx_resource_worker:async_query(ResId, Request, Opts) end; {error, not_found} -> ?RESOURCE_ERROR(not_found, "resource not found") @@ -336,6 +345,15 @@ list_group_instances(Group) -> emqx_resource_manager:list_group(Group). get_callback_mode(Mod) -> Mod:callback_mode(). +-spec is_buffer_supported(module()) -> boolean(). +is_buffer_supported(Module) -> + try + Module:is_buffer_supported() + catch + _:_ -> + false + end. + -spec call_start(manager_id(), module(), resource_config()) -> {ok, resource_state()} | {error, Reason :: term()}. call_start(MgrId, Mod, Config) -> diff --git a/apps/emqx_resource/src/emqx_resource_manager.erl b/apps/emqx_resource/src/emqx_resource_manager.erl index e4ba92b5c..82b565f6f 100644 --- a/apps/emqx_resource/src/emqx_resource_manager.erl +++ b/apps/emqx_resource/src/emqx_resource_manager.erl @@ -146,14 +146,20 @@ create(MgrId, ResId, Group, ResourceType, Config, Opts) -> ], [matched] ), - ok = emqx_resource_worker_sup:start_workers(ResId, Opts), - case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of + case emqx_resource:is_buffer_supported(ResourceType) of true -> - wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT)); + %% the resource it self supports + %% buffer, so there is no need for resource workers + ok; false -> - ok - end, - ok. + ok = emqx_resource_worker_sup:start_workers(ResId, Opts), + case maps:get(start_after_created, Opts, ?START_AFTER_CREATED) of + true -> + wait_for_ready(ResId, maps:get(start_timeout, Opts, ?START_TIMEOUT)); + false -> + ok + end + end. %% @doc Called from `emqx_resource` when doing a dry run for creating a resource instance. %% diff --git a/lib-ee/emqx_ee_bridge/docker-ct b/lib-ee/emqx_ee_bridge/docker-ct index f350a379c..a79037903 100644 --- a/lib-ee/emqx_ee_bridge/docker-ct +++ b/lib-ee/emqx_ee_bridge/docker-ct @@ -1,2 +1,3 @@ mongo mongo_rs_sharded +kafka diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf new file mode 100644 index 000000000..a47bf1249 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf @@ -0,0 +1,471 @@ +emqx_ee_bridge_kafka { + config_enable { + desc { + en: "Enable (true) or disable (false) this Kafka bridge." + zh: "启用(true)或停用该(false)Kafka 数据桥接。" + } + label { + en: "Enable or Disable" + zh: "启用或停用" + } + } + desc_config { + desc { + en: """Configuration for a Kafka bridge.""" + zh: """Kafka 桥接配置""" + } + label { + en: "Kafka Bridge Configuration" + zh: "Kafka 桥接配置" + } + } + desc_type { + desc { + en: """The Bridge Type""" + zh: """桥接类型""" + } + label { + en: "Bridge Type" + zh: "桥接类型" + } + } + desc_name { + desc { + en: """Bridge name, used as a human-readable description of the bridge.""" + zh: """桥接名字,可读描述""" + } + label { + en: "Bridge Name" + zh: "桥接名字" + } + } + producer_opts { + desc { + en: "Local MQTT data source and Kafka bridge configs." + zh: "本地 MQTT 数据源和 Kafka 桥接的配置。" + } + label { + en: "MQTT to Kafka" + zh: "MQTT 到 Kafka" + } + } + producer_mqtt_opts { + desc { + en: "MQTT data source. Optional when used as a rule-engine action." + zh: "需要桥接到 MQTT 源主题。" + } + label { + en: "MQTT Source Topic" + zh: "MQTT 源主题" + } + } + mqtt_topic { + desc { + en: "MQTT topic or topic as data source (bridge input)." + zh: "指定 MQTT 主题作为桥接的数据源" + } + label { + en: "Source MQTT Topic" + zh: "源 MQTT 主题" + } + } + producer_kafka_opts { + desc { + en: "Kafka producer configs." + zh: "Kafka 生产者参数。" + } + label { + en: "Kafka Producer" + zh: "生产者参数" + } + } + bootstrap_hosts { + desc { + en: "A comma separated list of Kafka host:port endpoints to bootstrap the client." + zh: "用逗号分隔的 host:port 主机列表。" + } + label { + en: "Bootstrap Hosts" + zh: "主机列表" + } + } + connect_timeout { + desc { + en: "Maximum wait time for TCP connection establishment (including authentication time if enabled)." + zh: "建立 TCP 连接时的最大等待时长(若启用认证,这个等待时长也包含完成认证所需时间)。" + } + label { + en: "Connect Timeout" + zh: "连接超时" + } + } + min_metadata_refresh_interval { + desc { + en: "Minimum time interval the client has to wait before refreshing Kafka broker and topic metadata. " + "Setting too small value may add extra load on Kafka." + zh: "刷新 Kafka broker 和 Kafka 主题元数据段最短时间间隔。设置太小可能会增加 Kafka 压力。" + } + label { + en: "Min Metadata Refresh Interval" + zh: "元数据刷新最小间隔" + } + } + metadata_request_timeout { + desc { + en: "Maximum wait time when fetching metadata from Kafka." + zh: "刷新元数据时最大等待时长。" + } + label { + en: "Metadata Request Timeout" + zh: "元数据请求超时" + } + } + authentication { + desc { + en: "Authentication configs." + zh: "认证参数。" + } + label { + en: "Authentication" + zh: "认证" + } + } + socket_opts { + desc { + en: "Extra socket options." + zh: "更多 Socket 参数设置。" + } + label { + en: "Socket Options" + zh: "Socket 参数" + } + } + auth_sasl_mechanism { + desc { + en: "SASL authentication mechanism." + zh: "SASL 认证方法名称。" + } + label { + en: "Mechanism" + zh: "认证方法" + } + } + auth_sasl_username { + desc { + en: "SASL authentication username." + zh: "SASL 认证的用户名。" + } + label { + en: "Username" + zh: "用户名" + } + } + auth_sasl_password { + desc { + en: "SASL authentication password." + zh: "SASL 认证的密码。" + } + label { + en: "Password" + zh: "密码" + } + } + auth_kerberos_principal { + desc { + en: "SASL GSSAPI authentication Kerberos principal. " + "For example client_name@MY.KERBEROS.REALM.MYDOMAIN.COM, " + "NOTE: The realm in use has to be configured in /etc/krb5.conf in EMQX nodes." + zh: "SASL GSSAPI 认证方法的 Kerberos principal," + "例如 client_name@MY.KERBEROS.REALM.MYDOMAIN.COM" + "注意:这里使用的 realm 需要配置在 EMQX 服务器的 /etc/krb5.conf 中" + } + label { + en: "Kerberos Principal" + zh: "Kerberos Principal" + } + } + auth_kerberos_keytab_file { + desc { + en: "SASL GSSAPI authentication Kerberos keytab file path. " + "NOTE: This file has to be placed in EMQX nodes, and the EMQX service runner user requires read permission." + zh: "SASL GSSAPI 认证方法的 Kerberos keytab 文件。" + "注意:该文件需要上传到 EMQX 服务器中,且运行 EMQX 服务的系统账户需要有读取权限。" + } + label { + en: "Kerberos keytab file" + zh: "Kerberos keytab 文件" + } + } + socket_send_buffer { + desc { + en: "Fine tune the socket send buffer. The default value is tuned for high throughput." + zh: "TCP socket 的发送缓存调优。默认值是针对高吞吐量的一个推荐值。" + } + label { + en: "Socket Send Buffer Size" + zh: "Socket 发送缓存大小" + } + } + socket_receive_buffer { + desc { + en: "Fine tune the socket receive buffer. The default value is tuned for high throughput." + zh: "TCP socket 的收包缓存调优。默认值是针对高吞吐量的一个推荐值。" + } + label { + en: "Socket Receive Buffer Size" + zh: "Socket 收包缓存大小" + } + } + socket_nodelay { + desc { + en: "When set to 'true', TCP buffer sent as soon as possible. " + "Otherwise the OS kernel may buffer small TCP packets for a while (40ms by default)." + zh: "设置 ‘true' 让系统内核立即发送。否则当需要发送当内容很少时,可能会有一定延迟(默认 40 毫秒)。" + } + label { + en: "No Delay" + zh: "是否延迟发送" + } + } + kafka_topic { + desc { + en: "Kafka topic name" + zh: "Kafka 主题名称" + } + label { + en: "Kafka Topic Name" + zh: "Kafka 主题名称" + } + } + kafka_message { + desc { + en: "Template to render a Kafka message." + zh: "用于生成 Kafka 消息的模版。" + } + label { + en: "Kafka Message Template" + zh: "Kafka 消息模版" + } + } + kafka_message_key { + desc { + en: "Template to render Kafka message key. " + "If the desired variable for this template is not found in the input data " + "NULL is used." + zh: "生成 Kafka 消息 Key 的模版。当所需要的输入没有时,会使用 NULL。" + } + label { + en: "Message Key" + zh: "消息的 Key" + } + } + kafka_message_value { + desc { + en: "Template to render Kafka message value. " + "If the desired variable for this template is not found in the input data " + "NULL is used." + zh: "生成 Kafka 消息 Value 的模版。当所需要的输入没有时,会使用 NULL。" + } + label { + en: "Message Value" + zh: "消息的 Value" + } + } + kafka_message_timestamp { + desc { + en: "Which timestamp to use. " + "The timestamp is expected to be a millisecond precision Unix epoch " + "which can be in string format, e.g. 1661326462115 or " + "'1661326462115'. " + "When the desired data field for this template is not found, " + "or if the found data is not a valid integer, " + "the current system timestamp will be used." + zh: "生成 Kafka 消息时间戳的模版。" + "该时间必需是一个整型数值(可以是字符串格式)例如 1661326462115 " + "或 '1661326462115'。" + "当所需的输入字段不存在,或不是一个整型时," + "则会使用当前系统时间。" + } + label { + en: "Message Timestamp" + zh: "消息的时间戳" + } + } + max_batch_bytes { + desc { + en: "Maximum bytes to collect in a Kafka message batch. " + "Most of the Kafka brokers default to a limit of 1MB batch size. " + "EMQX's default value is less than 1MB in order to compensate " + "Kafka message encoding overheads (especially when each individual message is very small). " + "When a single message is over the limit, it is still sent (as a single element batch)." + zh: "最大消息批量字节数。" + "大多数 Kafka 环境的默认最低值是 1MB,EMQX 的默认值比 1MB 更小是因为需要" + "补偿 Kafka 消息编码索需要的额外字节(尤其是当每条消息都很小的情况下)。" + "当单个消息的大小超过该限制时,它仍然会被发送,(相当于该批量中只有单个消息)。" + } + label { + en: "Max Batch Bytes" + zh: "最大批量字节数" + } + } + compression { + desc { + en: "Compression method." + zh: "压缩方法。" + } + label { + en: "Compression" + zh: "压缩" + } + } + partition_strategy { + desc { + en: "Partition strategy is to tell the producer how to dispatch messages to Kafka partitions.\n\n" + "random: Randomly pick a partition for each message\n" + "key_dispatch: Hash Kafka message key to a partition number\n" + zh: "设置消息发布时应该如何选择 Kafka 分区。\n\n" + "random: 为每个消息随机选择一个分区。\n" + "key_dispatch: Hash Kafka message key to a partition number\n" + } + label { + en: "Partition Strategy" + zh: "分区选择策略" + } + } + required_acks { + desc { + en: "Required acknowledgements for Kafka partition leader to wait for its followers " + "before it sends back the acknowledgement to EMQX Kafka producer\n\n" + "all_isr: Require all in-sync replicas to acknowledge.\n" + "leader_only: Require only the partition-leader's acknowledgement.\n" + "none: No need for Kafka to acknowledge at all.\n" + zh: "设置 Kafka leader 在返回给 EMQX 确认之前需要等待多少个 follower 的确认。\n\n" + "all_isr: 需要所有的在线复制者都确认。\n" + "leader_only: 仅需要分区 leader 确认。\n" + "none: 无需 Kafka 回复任何确认。\n" + } + label { + en: "Required Acks" + zh: "Kafka 确认数量" + } + } + partition_count_refresh_interval { + desc { + en: "The time interval for Kafka producer to discover increased number of partitions.\n" + "After the number of partitions is increased in Kafka, EMQX will start taking the \n" + "discovered partitions into account when dispatching messages per partition_strategy." + zh: "配置 Kafka 刷新分区数量的时间间隔。\n" + "EMQX 发现 Kafka 分区数量增加后,会开始按 partition_strategy 配置,把消息发送到新的分区中。" + } + label { + en: "Partition Count Refresh Interval" + zh: "分区数量刷新间隔" + } + } + max_inflight { + desc { + en: "Maximum number of batches allowed for Kafka producer (per-partition) to send before receiving acknowledgement from Kafka. " + "Greater value typically means better throughput. However, there can be a risk of message reordering when this " + "value is greater than 1." + zh: "设置 Kafka 生产者(每个分区一个)在收到 Kafka 的确认前最多发送多少个请求(批量)。" + "调大这个值通常可以增加吞吐量,但是,当该值设置大于 1 是存在消息乱序的风险。" + } + label { + en: "Max Inflight" + zh: "飞行窗口" + } + } + producer_buffer { + desc { + en: "Configure producer message buffer.\n\n" + "Tell Kafka producer how to buffer messages when EMQX has more messages to send than " + "Kafka can keep up, or when Kafka is down.\n\n" + zh: "配置消息缓存的相关参数。\n\n" + "当 EMQX 需要发送的消息超过 Kafka 处理能力,或者当 Kafka 临时下线时,EMQX 内部会将消息缓存起来。" + } + label { + en: "Message Buffer" + zh: "消息缓存" + } + } + buffer_mode { + desc { + en: "Message buffer mode.\n\n" + "memory: Buffer all messages in memory. The messages will be lost in case of EMQX node restart\n" + "disc: Buffer all messages on disk. The messages on disk are able to survive EMQX node restart.\n" + "hybrid: Buffer message in memory first, when up to certain limit " + "(see segment_bytes config for more information), then start offloading " + "messages to disk, Like memory mode, the messages will be lost in case of " + "EMQX node restart." + zh: "消息缓存模式。\n" + "memory: 所有的消息都缓存在内存里。如果 EMQX 服务重启,缓存的消息会丢失。\n" + "disc: 缓存到磁盘上。EMQX 重启后会继续发送重启前未发送完成的消息。\n" + "hybrid: 先将消息缓存在内存中,当内存中的消息堆积超过一定限制" + "(配置项 segment_bytes 描述了该限制)后,后续的消息会缓存到磁盘上。" + "与 memory 模式一样,如果 EMQX 服务重启,缓存的消息会丢失。" + } + label { + en: "Buffer Mode" + zh: "缓存模式" + } + } + buffer_per_partition_limit { + desc { + en: "Number of bytes allowed to buffer for each Kafka partition. " + "When this limit is exceeded, old messages will be dropped in a trade for credits " + "for new messages to be buffered." + zh: "为每个 Kafka 分区设置的最大缓存字节数。当超过这个上限之后,老的消息会被丢弃," + "为新的消息腾出空间。" + } + label { + en: "Per-partition Buffer Limit" + zh: "Kafka 分区缓存上限" + } + } + buffer_segment_bytes { + desc { + en: "Applicable when buffer mode is set to disk or hybrid.\n" + "This value is to specify the size of each on-disk buffer file." + zh: "当缓存模式是 diskhybrid 时适用。" + "该配置用于指定缓存到磁盘上的文件的大小。" + } + label { + en: "Segment File Bytes" + zh: "缓存文件大小" + } + } + buffer_memory_overload_protection { + desc { + en: "Applicable when buffer mode is set to memory or hybrid.\n" + "EMQX will drop old cached messages under high memory pressure. " + "The high memory threshold is defined in config sysmon.os.sysmem_high_watermark." + zh: "缓存模式是 memoryhybrid 时适用。" + "当系统处于高内存压力时,从队列中丢弃旧的消息以减缓内存增长。" + "内存压力值由配置项 sysmon.os.sysmem_high_watermark 决定。" + } + label { + en: "Memory Overload Protection" + zh: "内存过载保护" + } + } + auth_username_password { + desc { + en: "Username/password based authentication." + zh: "基于用户名密码的认证。" + } + label { + en: "Username/password Auth" + zh: "用户名密码认证" + } + } + auth_gssapi_kerberos { + desc { + en: "Use GSSAPI/Kerberos authentication." + zh: "使用 GSSAPI/Kerberos 认证。" + } + label { + en: "GSSAPI/Kerberos" + zh: "GSSAPI/Kerberos" + } + } +} diff --git a/lib-ee/emqx_ee_bridge/rebar.config b/lib-ee/emqx_ee_bridge/rebar.config index e986d7983..8c79e7274 100644 --- a/lib-ee/emqx_ee_bridge/rebar.config +++ b/lib-ee/emqx_ee_bridge/rebar.config @@ -1,5 +1,9 @@ {erl_opts, [debug_info]}. {deps, [ {hocon, {git, "https://github.com/emqx/hocon.git", {tag, "0.30.0"}}} + , {wolff, {git, "https://github.com/kafka4beam/wolff.git", {tag, "1.6.4"}}} + , {kafka_protocol, {git, "https://github.com/kafka4beam/kafka_protocol.git", {tag, "4.1.0"}}} + , {brod_gssapi, {git, "https://github.com/kafka4beam/brod_gssapi.git", {tag, "v0.1.0-rc1"}}} + , {brod, {git, "https://github.com/kafka4beam/brod.git", {tag, "3.16.4"}}} , {emqx_connector, {path, "../../apps/emqx_connector"}} , {emqx_resource, {path, "../../apps/emqx_resource"}} , {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src index a578b7d0d..97c884fe9 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.app.src @@ -3,7 +3,8 @@ {registered, []}, {applications, [ kernel, - stdlib + stdlib, + emqx_ee_connector ]}, {env, []}, {modules, []}, diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl index 840b963cd..cdf0a6439 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge.erl @@ -14,6 +14,7 @@ api_schemas(Method) -> [ + ref(emqx_ee_bridge_kafka, Method), ref(emqx_ee_bridge_mysql, Method), ref(emqx_ee_bridge_mongodb, Method ++ "_rs"), ref(emqx_ee_bridge_mongodb, Method ++ "_sharded"), @@ -26,6 +27,7 @@ api_schemas(Method) -> schema_modules() -> [ + emqx_ee_bridge_kafka, emqx_ee_bridge_hstreamdb, emqx_ee_bridge_influxdb, emqx_ee_bridge_mongodb, @@ -45,6 +47,7 @@ examples(Method) -> lists:foldl(Fun, #{}, schema_modules()). resource_type(Type) when is_binary(Type) -> resource_type(binary_to_atom(Type, utf8)); +resource_type(kafka) -> emqx_bridge_impl_kafka; resource_type(hstreamdb) -> emqx_ee_connector_hstreamdb; resource_type(mongodb_rs) -> emqx_connector_mongo; resource_type(mongodb_sharded) -> emqx_connector_mongo; @@ -56,6 +59,11 @@ resource_type(influxdb_api_v2) -> emqx_ee_connector_influxdb. fields(bridges) -> [ + {kafka, + mk( + hoconsc:map(name, ref(emqx_ee_bridge_kafka, "config")), + #{desc => <<"EMQX Enterprise Config">>} + )}, {hstreamdb, mk( hoconsc:map(name, ref(emqx_ee_bridge_hstreamdb, "config")), @@ -66,8 +74,9 @@ fields(bridges) -> hoconsc:map(name, ref(emqx_ee_bridge_mysql, "config")), #{desc => <<"EMQX Enterprise Config">>} )} - ] ++ fields(mongodb) ++ fields(influxdb); -fields(mongodb) -> + ] ++ mongodb_structs() ++ influxdb_structs(). + +mongodb_structs() -> [ {Type, mk( @@ -75,8 +84,9 @@ fields(mongodb) -> #{desc => <<"EMQX Enterprise Config">>} )} || Type <- [mongodb_rs, mongodb_sharded, mongodb_single] - ]; -fields(influxdb) -> + ]. + +influxdb_structs() -> [ {Protocol, mk( diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl new file mode 100644 index 000000000..080af35d0 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -0,0 +1,260 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_kafka). + +-include_lib("emqx_bridge/include/emqx_bridge.hrl"). +-include_lib("emqx_connector/include/emqx_connector.hrl"). +-include_lib("typerefl/include/types.hrl"). +-include_lib("hocon/include/hoconsc.hrl"). + +%% allow atoms like scram_sha_256 and scram_sha_512 +%% i.e. the _256 part does not start with a-z +-elvis([ + {elvis_style, atom_naming_convention, #{ + regex => "^([a-z][a-z0-9]*_?)([a-z0-9]*_?)*$", + enclosed_atoms => ".*" + }} +]). +-import(hoconsc, [mk/2, enum/1, ref/2]). + +-export([ + conn_bridge_examples/1 +]). + +-export([ + namespace/0, + roots/0, + fields/1, + desc/1 +]). + +%% ------------------------------------------------------------------------------------------------- +%% api + +conn_bridge_examples(Method) -> + [ + #{ + <<"kafka">> => #{ + summary => <<"Kafka Bridge">>, + value => values(Method) + } + } + ]. + +values(get) -> + maps:merge(values(post), ?METRICS_EXAMPLE); +values(post) -> + #{ + bootstrap_hosts => <<"localhost:9092">> + }; +values(put) -> + values(post). + +%% ------------------------------------------------------------------------------------------------- +%% Hocon Schema Definitions + +namespace() -> "bridge_kafka". + +roots() -> ["config"]. + +fields("post") -> + [type_field(), name_field() | fields("config")]; +fields("put") -> + fields("config"); +fields("get") -> + emqx_bridge_schema:metrics_status_fields() ++ fields("post"); +fields("config") -> + [ + {enable, mk(boolean(), #{desc => ?DESC("config_enable"), default => true})}, + {bootstrap_hosts, mk(binary(), #{required => true, desc => ?DESC(bootstrap_hosts)})}, + {connect_timeout, + mk(emqx_schema:duration_ms(), #{ + default => "5s", + desc => ?DESC(connect_timeout) + })}, + {min_metadata_refresh_interval, + mk( + emqx_schema:duration_ms(), + #{ + default => "3s", + desc => ?DESC(min_metadata_refresh_interval) + } + )}, + {metadata_request_timeout, + mk(emqx_schema:duration_ms(), #{ + default => "5s", + desc => ?DESC(metadata_request_timeout) + })}, + {authentication, + mk(hoconsc:union([none, ref(auth_username_password), ref(auth_gssapi_kerberos)]), #{ + default => none, desc => ?DESC("authentication") + })}, + {producer, mk(hoconsc:union([none, ref(producer_opts)]), #{desc => ?DESC(producer_opts)})}, + %{consumer, mk(hoconsc:union([none, ref(consumer_opts)]), #{desc => ?DESC(consumer_opts)})}, + {socket_opts, mk(ref(socket_opts), #{required => false, desc => ?DESC(socket_opts)})} + ] ++ emqx_connector_schema_lib:ssl_fields(); +fields(auth_username_password) -> + [ + {mechanism, + mk(enum([plain, scram_sha_256, scram_sha_512]), #{ + required => true, desc => ?DESC(auth_sasl_mechanism) + })}, + {username, mk(binary(), #{required => true, desc => ?DESC(auth_sasl_username)})}, + {password, + mk(binary(), #{required => true, sensitive => true, desc => ?DESC(auth_sasl_password)})} + ]; +fields(auth_gssapi_kerberos) -> + [ + {kerberos_principal, + mk(binary(), #{ + required => true, + desc => ?DESC(auth_kerberos_principal) + })}, + {kerberos_keytab_file, + mk(binary(), #{ + required => true, + desc => ?DESC(auth_kerberos_keytab_file) + })} + ]; +fields(socket_opts) -> + [ + {sndbuf, + mk( + emqx_schema:bytesize(), + #{default => "1024KB", desc => ?DESC(socket_send_buffer)} + )}, + {recbuf, + mk( + emqx_schema:bytesize(), + #{default => "1024KB", desc => ?DESC(socket_receive_buffer)} + )}, + {nodelay, + mk( + boolean(), + #{default => true, desc => ?DESC(socket_nodelay)} + )} + ]; +fields(producer_opts) -> + [ + {mqtt, mk(ref(producer_mqtt_opts), #{desc => ?DESC(producer_mqtt_opts)})}, + {kafka, + mk(ref(producer_kafka_opts), #{ + required => true, + desc => ?DESC(producer_kafka_opts) + })} + ]; +fields(producer_mqtt_opts) -> + [{topic, mk(string(), #{desc => ?DESC(mqtt_topic)})}]; +fields(producer_kafka_opts) -> + [ + {topic, mk(string(), #{required => true, desc => ?DESC(kafka_topic)})}, + {message, mk(ref(kafka_message), #{required => false, desc => ?DESC(kafka_message)})}, + {max_batch_bytes, + mk(emqx_schema:bytesize(), #{default => "896KB", desc => ?DESC(max_batch_bytes)})}, + {compression, + mk(enum([no_compression, snappy, gzip]), #{ + default => no_compression, desc => ?DESC(compression) + })}, + {partition_strategy, + mk( + enum([random, key_dispatch]), + #{default => random, desc => ?DESC(partition_strategy)} + )}, + {required_acks, + mk( + enum([all_isr, leader_only, none]), + #{ + default => all_isr, + desc => ?DESC(required_acks) + } + )}, + {partition_count_refresh_interval, + mk( + emqx_schema:duration_s(), + #{ + default => "60s", + desc => ?DESC(partition_count_refresh_interval) + } + )}, + {max_inflight, + mk( + pos_integer(), + #{ + default => 10, + desc => ?DESC(max_inflight) + } + )}, + {buffer, + mk(ref(producer_buffer), #{ + required => false, + desc => ?DESC(producer_buffer) + })} + ]; +fields(kafka_message) -> + [ + {key, mk(string(), #{default => "${clientid}", desc => ?DESC(kafka_message_key)})}, + {value, mk(string(), #{default => "${payload}", desc => ?DESC(kafka_message_value)})}, + {timestamp, + mk(string(), #{ + default => "${timestamp}", desc => ?DESC(kafka_message_timestamp) + })} + ]; +fields(producer_buffer) -> + [ + {mode, + mk( + enum([memory, disk, hybrid]), + #{default => memory, desc => ?DESC(buffer_mode)} + )}, + {per_partition_limit, + mk( + emqx_schema:bytesize(), + #{default => "2GB", desc => ?DESC(buffer_per_partition_limit)} + )}, + {segment_bytes, + mk( + emqx_schema:bytesize(), + #{default => "100MB", desc => ?DESC(buffer_segment_bytes)} + )}, + {memory_overload_protection, + mk(boolean(), #{ + %% different from 4.x + default => true, + desc => ?DESC(buffer_memory_overload_protection) + })} + ]. + +% fields(consumer_opts) -> +% [ +% {kafka, mk(ref(consumer_kafka_opts), #{required => true, desc => ?DESC(consumer_kafka_opts)})}, +% {mqtt, mk(ref(consumer_mqtt_opts), #{required => true, desc => ?DESC(consumer_mqtt_opts)})} +% ]; +% fields(consumer_mqtt_opts) -> +% [ {topic, mk(string(), #{desc => ?DESC(consumer_mqtt_topic)})} +% ]; + +% fields(consumer_mqtt_opts) -> +% [ {topic, mk(string(), #{desc => ?DESC(consumer_mqtt_topic)})} +% ]; +% fields(consumer_kafka_opts) -> +% [ {topic, mk(string(), #{desc => ?DESC(consumer_kafka_topic)})} +% ]. + +desc("config") -> + ?DESC("desc_config"); +desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> + ["Configuration for Kafka using `", string:to_upper(Method), "` method."]; +desc(_) -> + undefined. + +%% ------------------------------------------------------------------------------------------------- +%% internal +type_field() -> + {type, mk(enum([kafka]), #{required => true, desc => ?DESC("desc_type")})}. + +name_field() -> + {name, mk(binary(), #{required => true, desc => ?DESC("desc_name")})}. + +ref(Name) -> + hoconsc:ref(?MODULE, Name). diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl new file mode 100644 index 000000000..d1fad4765 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka.erl @@ -0,0 +1,33 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +%% Kafka connection configuration +-module(emqx_bridge_impl_kafka). +-behaviour(emqx_resource). + +%% callbacks of behaviour emqx_resource +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_get_status/2, + is_buffer_supported/0 +]). + +is_buffer_supported() -> true. + +callback_mode() -> async_if_possible. + +on_start(InstId, Config) -> + emqx_bridge_impl_kafka_producer:on_start(InstId, Config). + +on_stop(InstId, State) -> + emqx_bridge_impl_kafka_producer:on_stop(InstId, State). + +on_query(InstId, Msg, State) -> + emqx_bridge_impl_kafka_producer:on_query(InstId, Msg, State). + +on_get_status(InstId, State) -> + emqx_bridge_impl_kafka_producer:on_get_status(InstId, State). diff --git a/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl new file mode 100644 index 000000000..ce82dbe2d --- /dev/null +++ b/lib-ee/emqx_ee_bridge/src/kafka/emqx_bridge_impl_kafka_producer.erl @@ -0,0 +1,270 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_bridge_impl_kafka_producer). + +%% callbacks of behaviour emqx_resource +-export([ + callback_mode/0, + on_start/2, + on_stop/2, + on_query/3, + on_get_status/2 +]). + +-export([on_kafka_ack/3]). + +-include_lib("emqx/include/logger.hrl"). + +callback_mode() -> async_if_possible. + +%% @doc Config schema is defined in emqx_ee_bridge_kafka. +on_start(InstId, Config) -> + #{ + bridge_name := BridgeName, + bootstrap_hosts := Hosts0, + connect_timeout := ConnTimeout, + metadata_request_timeout := MetaReqTimeout, + min_metadata_refresh_interval := MinMetaRefreshInterval, + socket_opts := SocketOpts, + authentication := Auth, + ssl := SSL + } = Config, + %% it's a bug if producer config is not found + %% the caller should not try to start a producer if + %% there is no producer config + ProducerConfigWrapper = get_required(producer, Config, no_kafka_producer_config), + ProducerConfig = get_required(kafka, ProducerConfigWrapper, no_kafka_producer_parameters), + MessageTemplate = get_required(message, ProducerConfig, no_kafka_message_template), + Hosts = hosts(Hosts0), + ClientId = make_client_id(BridgeName), + ClientConfig = #{ + min_metadata_refresh_interval => MinMetaRefreshInterval, + connect_timeout => ConnTimeout, + client_id => ClientId, + request_timeout => MetaReqTimeout, + extra_sock_opts => socket_opts(SocketOpts), + sasl => sasl(Auth), + ssl => ssl(SSL) + }, + #{ + topic := KafkaTopic + } = ProducerConfig, + case wolff:ensure_supervised_client(ClientId, Hosts, ClientConfig) of + {ok, _} -> + ?SLOG(info, #{ + msg => "kafka_client_started", + instance_id => InstId, + kafka_hosts => Hosts + }); + {error, Reason} -> + ?SLOG(error, #{ + msg => "failed_to_start_kafka_client", + instance_id => InstId, + kafka_hosts => Hosts, + reason => Reason + }), + throw(failed_to_start_kafka_client) + end, + WolffProducerConfig = producers_config(BridgeName, ClientId, ProducerConfig), + case wolff:ensure_supervised_producers(ClientId, KafkaTopic, WolffProducerConfig) of + {ok, Producers} -> + {ok, #{ + message_template => compile_message_template(MessageTemplate), + client_id => ClientId, + producers => Producers + }}; + {error, Reason2} -> + ?SLOG(error, #{ + msg => "failed_to_start_kafka_producer", + instance_id => InstId, + kafka_hosts => Hosts, + kafka_topic => KafkaTopic, + reason => Reason2 + }), + throw(failed_to_start_kafka_producer) + end. + +on_stop(_InstId, #{client_id := ClientID, producers := Producers}) -> + with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_producers(Producers) end, + #{ + msg => "failed_to_delete_kafka_producer", + client_id => ClientID + } + ), + with_log_at_error( + fun() -> wolff:stop_and_delete_supervised_client(ClientID) end, + #{ + msg => "failed_to_delete_kafka_client", + client_id => ClientID + } + ). + +%% @doc The callback API for rule-engine (or bridge without rules) +%% The input argument `Message' is an enriched format (as a map()) +%% of the original #message{} record. +%% The enrichment is done by rule-engine or by the data bridge framework. +%% E.g. the output of rule-engine process chain +%% or the direct mapping from an MQTT message. +on_query(_InstId, {send_message, Message}, #{message_template := Template, producers := Producers}) -> + KafkaMessage = render_message(Template, Message), + %% The retuned information is discarded here. + %% If the producer process is down when sending, this function would + %% raise an error exception which is to be caught by the caller of this callback + {_Partition, _Pid} = wolff:send(Producers, [KafkaMessage], {fun ?MODULE:on_kafka_ack/3, [#{}]}), + ok. + +compile_message_template(#{ + key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate +}) -> + #{ + key => emqx_plugin_libs_rule:preproc_tmpl(KeyTemplate), + value => emqx_plugin_libs_rule:preproc_tmpl(ValueTemplate), + timestamp => emqx_plugin_libs_rule:preproc_tmpl(TimestampTemplate) + }. + +render_message( + #{key := KeyTemplate, value := ValueTemplate, timestamp := TimestampTemplate}, Message +) -> + #{ + key => render(KeyTemplate, Message), + value => render(ValueTemplate, Message), + ts => render_timestamp(TimestampTemplate, Message) + }. + +render(Template, Message) -> + emqx_plugin_libs_rule:proc_tmpl(Template, Message). + +render_timestamp(Template, Message) -> + try + binary_to_integer(render(Template, Message)) + catch + _:_ -> + erlang:system_time(millisecond) + end. + +on_kafka_ack(_Partition, _Offset, _Extra) -> + %% Do nothing so far. + %% Maybe need to bump some counters? + ok. + +on_get_status(_InstId, _State) -> + connected. + +%% Parse comma separated host:port list into a [{Host,Port}] list +hosts(Hosts) when is_binary(Hosts) -> + hosts(binary_to_list(Hosts)); +hosts(Hosts) when is_list(Hosts) -> + kpro:parse_endpoints(Hosts). + +%% Extra socket options, such as sndbuf size etc. +socket_opts(Opts) when is_map(Opts) -> + socket_opts(maps:to_list(Opts)); +socket_opts(Opts) when is_list(Opts) -> + socket_opts_loop(Opts, []). + +socket_opts_loop([], Acc) -> + lists:reverse(Acc); +socket_opts_loop([{T, Bytes} | Rest], Acc) when + T =:= sndbuf orelse T =:= recbuf orelse T =:= buffer +-> + Acc1 = [{T, Bytes} | adjust_socket_buffer(Bytes, Acc)], + socket_opts_loop(Rest, Acc1); +socket_opts_loop([Other | Rest], Acc) -> + socket_opts_loop(Rest, [Other | Acc]). + +%% https://www.erlang.org/doc/man/inet.html +%% For TCP it is recommended to have val(buffer) >= val(recbuf) +%% to avoid performance issues because of unnecessary copying. +adjust_socket_buffer(Bytes, Opts) -> + case lists:keytake(buffer, 1, Opts) of + false -> + [{buffer, Bytes} | Opts]; + {value, {buffer, Bytes1}, Acc1} -> + [{buffer, max(Bytes1, Bytes)} | Acc1] + end. + +sasl(none) -> + undefined; +sasl(#{mechanism := Mechanism, username := Username, password := Password}) -> + {Mechanism, Username, emqx_secret:wrap(Password)}; +sasl(#{ + kerberos_principal := Principal, + kerberos_keytab_file := KeyTabFile +}) -> + {callback, brod_gssapi, {gssapi, KeyTabFile, Principal}}. + +ssl(#{enable := true} = SSL) -> + emqx_tls_lib:to_client_opts(SSL); +ssl(_) -> + []. + +producers_config(BridgeName, ClientId, Input) -> + #{ + max_batch_bytes := MaxBatchBytes, + compression := Compression, + partition_strategy := PartitionStrategy, + required_acks := RequiredAcks, + partition_count_refresh_interval := PCntRefreshInterval, + max_inflight := MaxInflight, + buffer := #{ + mode := BufferMode, + per_partition_limit := PerPartitionLimit, + segment_bytes := SegmentBytes, + memory_overload_protection := MemOLP + } + } = Input, + + {OffloadMode, ReplayqDir} = + case BufferMode of + memory -> {false, false}; + disk -> {false, replayq_dir(ClientId)}; + hybrid -> {true, replayq_dir(ClientId)} + end, + #{ + name => make_producer_name(BridgeName), + partitioner => PartitionStrategy, + partition_count_refresh_interval_seconds => PCntRefreshInterval, + replayq_dir => ReplayqDir, + replayq_offload_mode => OffloadMode, + replayq_max_total_bytes => PerPartitionLimit, + replayq_seg_bytes => SegmentBytes, + drop_if_highmem => MemOLP, + required_acks => RequiredAcks, + max_batch_bytes => MaxBatchBytes, + max_send_ahead => MaxInflight - 1, + compression => Compression + }. + +replayq_dir(ClientId) -> + filename:join([emqx:data_dir(), "kafka", ClientId]). + +%% Client ID is better to be unique to make it easier for Kafka side trouble shooting. +make_client_id(BridgeName) when is_atom(BridgeName) -> + make_client_id(atom_to_list(BridgeName)); +make_client_id(BridgeName) -> + iolist_to_binary([BridgeName, ":", atom_to_list(node())]). + +%% Producer name must be an atom which will be used as a ETS table name for +%% partition worker lookup. +make_producer_name(BridgeName) when is_atom(BridgeName) -> + make_producer_name(atom_to_list(BridgeName)); +make_producer_name(BridgeName) -> + list_to_atom("kafka_producer_" ++ BridgeName). + +with_log_at_error(Fun, Log) -> + try + Fun() + catch + C:E -> + ?SLOG(error, Log#{ + exception => C, + reason => E + }) + end. + +get_required(Field, Config, Throw) -> + Value = maps:get(Field, Config, none), + Value =:= none andalso throw(Throw), + Value. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl new file mode 100644 index 000000000..8d01d0d69 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -0,0 +1,90 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_impl_kafka_producer_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("brod/include/brod.hrl"). + +-define(PRODUCER, emqx_bridge_impl_kafka). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + {ok, _} = application:ensure_all_started(brod), + {ok, _} = application:ensure_all_started(wolff), + Config. + +end_per_suite(_) -> + ok. + +t_publish(_CtConfig) -> + KafkaTopic = "test-topic-one-partition", + Conf = config(#{ + "kafka_hosts_string" => kafka_hosts_string(), + "kafka_topic" => KafkaTopic + }), + InstId = <<"InstanceID">>, + Time = erlang:system_time(millisecond), + BinTime = integer_to_binary(Time), + Msg = #{ + clientid => BinTime, + payload => <<"payload">>, + timestamp => Time + }, + {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), + ct:pal("base offset before testing ~p", [Offset]), + {ok, State} = ?PRODUCER:on_start(InstId, Conf), + ok = ?PRODUCER:on_query(InstId, {send_message, Msg}, State), + {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), + ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), + ok = ?PRODUCER:on_stop(InstId, State), + ok. + +config(Args) -> + {ok, Conf} = hocon:binary(hocon_config(Args)), + #{config := Parsed} = hocon_tconf:check_plain( + emqx_ee_bridge_kafka, + #{<<"config">> => Conf}, + #{atom_key => true} + ), + Parsed#{bridge_name => "testbridge"}. + +hocon_config(Args) -> + Hocon = bbmustache:render(iolist_to_binary(hocon_config_template()), Args), + Hocon. + +%% erlfmt-ignore +hocon_config_template() -> +""" +bootstrap_hosts = \"{{ kafka_hosts_string }}\" +enable = true +authentication = none +producer = { + mqtt { + topic = \"t/#\" + } + kafka = { + topic = \"{{ kafka_topic }}\" + } +} +""". + +kafka_hosts_string() -> + "kafka-1.emqx.net:9092,". + +kafka_hosts() -> + kpro:parse_endpoints(kafka_hosts_string()). + +resolve_kafka_offset(Hosts, Topic, Partition) -> + brod:resolve_offset(Hosts, Topic, Partition, latest). diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src index 675a934aa..c1b86d20b 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector.app.src @@ -5,7 +5,9 @@ kernel, stdlib, hstreamdb_erl, - influxdb + influxdb, + wolff, + brod ]}, {env, []}, {modules, []}, diff --git a/mix.exs b/mix.exs index d871ddf82..2b7f1419b 100644 --- a/mix.exs +++ b/mix.exs @@ -44,7 +44,7 @@ defmodule EMQXUmbrella.MixProject do # we need several overrides here because dependencies specify # other exact versions, and not ranges. [ - {:lc, github: "emqx/lc", tag: "0.3.1"}, + {:lc, github: "emqx/lc", tag: "0.3.2", override: true}, {:redbug, "2.0.7"}, {:typerefl, github: "ieQu1/typerefl", tag: "0.9.1", override: true}, {:ehttpc, github: "emqx/ehttpc", tag: "0.4.0", override: true}, @@ -57,7 +57,7 @@ defmodule EMQXUmbrella.MixProject do {:grpc, github: "emqx/grpc-erl", tag: "0.6.6", override: true}, {:minirest, github: "emqx/minirest", tag: "1.3.7", override: true}, {:ecpool, github: "emqx/ecpool", tag: "0.5.2", override: true}, - {:replayq, "0.3.4", override: true}, + {:replayq, github: "emqx/replayq", tag: "0.3.4", override: true}, {:pbkdf2, github: "emqx/erlang-pbkdf2", tag: "2.0.4", override: true}, {:emqtt, github: "emqx/emqtt", tag: "1.7.0-rc.1", override: true}, {:rulesql, github: "emqx/rulesql", tag: "0.1.4"}, @@ -129,7 +129,13 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.2.5"}, - {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.3", override: true} + {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.3", override: true}, + {:wolff, github: "kafka4beam/wolff", tag: "1.6.4"}, + {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.0", override: true}, + {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.0-rc1"}, + {:brod, github: "kafka4beam/brod", tag: "3.16.4"}, + {:snappyer, "1.2.8", override: true}, + {:supervisor3, "1.1.11", override: true} ] end diff --git a/rebar.config b/rebar.config index e29c5f2c7..e5bab7c6d 100644 --- a/rebar.config +++ b/rebar.config @@ -44,7 +44,7 @@ {post_hooks,[]}. {deps, - [ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}} + [ {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}} , {redbug, "2.0.7"} , {gpb, "4.11.2"} %% gpb only used to build, but not for release, pin it here to avoid fetching a wrong version due to rebar plugins scattered in all the deps , {typerefl, {git, "https://github.com/ieQu1/typerefl", {tag, "0.9.1"}}} @@ -59,7 +59,7 @@ , {grpc, {git, "https://github.com/emqx/grpc-erl", {tag, "0.6.6"}}} , {minirest, {git, "https://github.com/emqx/minirest", {tag, "1.3.7"}}} , {ecpool, {git, "https://github.com/emqx/ecpool", {tag, "0.5.2"}}} - , {replayq, "0.3.4"} + , {replayq, {git, "https://github.com/emqx/replayq.git", {tag, "0.3.4"}}} , {pbkdf2, {git, "https://github.com/emqx/erlang-pbkdf2.git", {tag, "2.0.4"}}} , {emqtt, {git, "https://github.com/emqx/emqtt", {tag, "1.7.0-rc.1"}}} , {rulesql, {git, "https://github.com/emqx/rulesql", {tag, "0.1.4"}}} diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 2c87bb0cf..c478ce005 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -10,12 +10,20 @@ cd -P -- "$(dirname -- "${BASH_SOURCE[0]}")/../.." help() { echo echo "-h|--help: To display this usage info" - echo "--app lib_dir/app_name: Print apps in json" + echo "--app lib_dir/app_name: For which app to run start docker-compose, and run common tests" + echo "--suites SUITE1,SUITE2: Comma separated SUITE names to run. e.g. apps/emqx/test/emqx_SUITE.erl" echo "--console: Start EMQX in console mode" + echo "--attach: Attach to the Erlang docker container without running any test case" + echo "--only-up: Keep the testbed running after CT" + echo "--keep-up: Only start the testbed but do not run CT" } WHICH_APP='novalue' CONSOLE='no' +KEEP_UP='no' +ONLY_UP='no' +SUITES='' +ATTACH='no' while [ "$#" -gt 0 ]; do case $1 in -h|--help) @@ -26,10 +34,26 @@ while [ "$#" -gt 0 ]; do WHICH_APP="$2" shift 2 ;; + --only-up) + ONLY_UP='yes' + shift 1 + ;; + --keep-up) + KEEP_UP='yes' + shift 1 + ;; + --attach) + ATTACH='yes' + shift 1 + ;; --console) CONSOLE='yes' shift 1 ;; + --suites) + SUITES="$2" + shift 2 + ;; *) echo "unknown option $1" exit 1 @@ -45,6 +69,16 @@ fi ERLANG_CONTAINER='erlang24' DOCKER_CT_ENVS_FILE="${WHICH_APP}/docker-ct" +case "${WHICH_APP}" in + lib-ee*) + ## ensure enterprise profile when testing lib-ee applications + export PROFILE='emqx-enterprise' + ;; + *) + true + ;; +esac + if [ -f "$DOCKER_CT_ENVS_FILE" ]; then # shellcheck disable=SC2002 CT_DEPS="$(cat "$DOCKER_CT_ENVS_FILE" | xargs)" @@ -80,6 +114,9 @@ for dep in ${CT_DEPS}; do FILES+=( '.ci/docker-compose-file/docker-compose-pgsql-tcp.yaml' '.ci/docker-compose-file/docker-compose-pgsql-tls.yaml' ) ;; + kafka) + FILES+=( '.ci/docker-compose-file/docker-compose-kafka.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1 @@ -104,13 +141,23 @@ if [[ -t 1 ]]; then fi docker exec -i $TTY "$ERLANG_CONTAINER" bash -c 'git config --global --add safe.directory /emqx' -if [ "$CONSOLE" = 'yes' ]; then +if [ "$ONLY_UP" = 'yes' ]; then + exit 0 +fi + +if [ "$ATTACH" = 'yes' ]; then + docker exec -it "$ERLANG_CONTAINER" bash +elif [ "$CONSOLE" = 'yes' ]; then docker exec -i $TTY "$ERLANG_CONTAINER" bash -c "make run" else set +e - docker exec -i $TTY "$ERLANG_CONTAINER" bash -c "make ${WHICH_APP}-ct" + docker exec -i $TTY -e EMQX_CT_SUITES="$SUITES" "$ERLANG_CONTAINER" bash -c "make ${WHICH_APP}-ct" RESULT=$? - # shellcheck disable=2086 # no quotes for F_OPTIONS - docker-compose $F_OPTIONS down - exit $RESULT + if [ "$KEEP_UP" = 'yes' ]; then + exit $RESULT + else + # shellcheck disable=2086 # no quotes for F_OPTIONS + docker-compose $F_OPTIONS down + exit $RESULT + fi fi diff --git a/scripts/find-suites.sh b/scripts/find-suites.sh index 4d2fd3bee..e7c1b422e 100755 --- a/scripts/find-suites.sh +++ b/scripts/find-suites.sh @@ -8,5 +8,9 @@ set -euo pipefail # ensure dir cd -P -- "$(dirname -- "$0")/.." -TESTDIR="$1/test" -find "${TESTDIR}" -name "*_SUITE.erl" -print0 2>/dev/null | xargs -0 | tr ' ' ',' +if [ -z "${EMQX_CT_SUITES:-}" ]; then + TESTDIR="$1/test" + find "${TESTDIR}" -name "*_SUITE.erl" -print0 2>/dev/null | xargs -0 | tr ' ' ',' +else + echo "${EMQX_CT_SUITES}" +fi From f0e03086a639390df54b5698d68ce2a1cbcb9a59 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Mon, 12 Sep 2022 20:54:58 +0200 Subject: [PATCH 02/17] test: add test cases for Kafka SASL auth mechanisms plain and scram --- .../docker-compose-kafka.yaml | 15 +++- .ci/docker-compose-file/kafka/jaas.conf | 9 +++ .../kafka/run_add_scram_users.sh | 26 ++++++ .../emqx_bridge_impl_kafka_producer_SUITE.erl | 80 +++++++++++++++++-- scripts/ct/run.sh | 6 +- 5 files changed, 122 insertions(+), 14 deletions(-) create mode 100644 .ci/docker-compose-file/kafka/jaas.conf create mode 100755 .ci/docker-compose-file/kafka/run_add_scram_users.sh diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml index 85532725d..edde553af 100644 --- a/.ci/docker-compose-file/docker-compose-kafka.yaml +++ b/.ci/docker-compose-file/docker-compose-kafka.yaml @@ -13,15 +13,24 @@ services: image: wurstmeister/kafka:2.13-2.7.0 ports: - "9092:9092" + - "9093:9093" container_name: kafka-1.emqx.net hostname: kafka-1.emqx.net environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: PLAINTEXT://:9092 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092,SASL_PLAINTEXT://kafka-1.emqx.net:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_SASL_ENABLED_MECHANISMS: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 + KAFKA_JMX_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas.conf" + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true + KAFKA_CREATE_TOPICS: test-topic-one-partition:1:1,test-topic-two-partitions:2:1,test-topic-three-partitions:3:1, networks: emqx_bridge: + volumes: + - ./kafka/jaas.conf:/etc/kafka/jaas.conf + - ./kafka/run_add_scram_users.sh:/bin/run_add_scram_users.sh + command: run_add_scram_users.sh diff --git a/.ci/docker-compose-file/kafka/jaas.conf b/.ci/docker-compose-file/kafka/jaas.conf new file mode 100644 index 000000000..bf6e6716b --- /dev/null +++ b/.ci/docker-compose-file/kafka/jaas.conf @@ -0,0 +1,9 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + user_admin="password" + user_emqxuser="password"; + + org.apache.kafka.common.security.scram.ScramLoginModule required + username="admin" + password="password"; +}; diff --git a/.ci/docker-compose-file/kafka/run_add_scram_users.sh b/.ci/docker-compose-file/kafka/run_add_scram_users.sh new file mode 100755 index 000000000..3a3d2ee21 --- /dev/null +++ b/.ci/docker-compose-file/kafka/run_add_scram_users.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +set -euo pipefail + +echo "+++++++ Starting Kafka ++++++++" + +start-kafka.sh & + +SERVER=localhost +PORT1=9092 +PORT2=9093 +TIMEOUT=60 + +echo "+++++++ Wait until Kafka ports are up ++++++++" + +timeout $TIMEOUT bash -c 'until printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1 + +timeout $TIMEOUT bash -c 'until printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT2 + +echo "+++++++ Run config commands ++++++++" + +kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=password],SCRAM-SHA-512=[password=password]' --entity-type users --entity-name emqxuser + +echo "+++++++ Wait until Kafka ports are down ++++++++" + +bash -c 'while printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1 diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 8d01d0d69..0eb393d4d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -28,12 +28,7 @@ init_per_suite(Config) -> end_per_suite(_) -> ok. -t_publish(_CtConfig) -> - KafkaTopic = "test-topic-one-partition", - Conf = config(#{ - "kafka_hosts_string" => kafka_hosts_string(), - "kafka_topic" => KafkaTopic - }), +do_publish(Conf, KafkaTopic) -> InstId = <<"InstanceID">>, Time = erlang:system_time(millisecond), BinTime = integer_to_binary(Time), @@ -51,6 +46,54 @@ t_publish(_CtConfig) -> ok = ?PRODUCER:on_stop(InstId, State), ok. +t_publish(_CtConfig) -> + KafkaTopic = "test-topic-one-partition", + Conf = config(#{ + "authentication" => "none", + "kafka_hosts_string" => kafka_hosts_string(), + "kafka_topic" => KafkaTopic + }), + do_publish(Conf, KafkaTopic). + +t_publish_sasl_plain(_CtConfig) -> + KafkaTopic = "test-topic-one-partition", + Conf = config(#{ + "authentication" => #{ + "mechanism" => "plain", + "username" => "emqxuser", + "password" => "password" + }, + "kafka_hosts_string" => kafka_hosts_string_sasl(), + "kafka_topic" => KafkaTopic + }), + do_publish(Conf, KafkaTopic). + +t_publish_sasl_scram256(_CtConfig) -> + KafkaTopic = "test-topic-one-partition", + Conf = config(#{ + "authentication" => #{ + "mechanism" => "scram_sha_256", + "username" => "emqxuser", + "password" => "password" + }, + "kafka_hosts_string" => kafka_hosts_string_sasl(), + "kafka_topic" => KafkaTopic + }), + do_publish(Conf, KafkaTopic). + +t_publish_sasl_scram512(_CtConfig) -> + KafkaTopic = "test-topic-one-partition", + Conf = config(#{ + "authentication" => #{ + "mechanism" => "scram_sha_512", + "username" => "emqxuser", + "password" => "password" + }, + "kafka_hosts_string" => kafka_hosts_string_sasl(), + "kafka_topic" => KafkaTopic + }), + do_publish(Conf, KafkaTopic). + config(Args) -> {ok, Conf} = hocon:binary(hocon_config(Args)), #{config := Parsed} = hocon_tconf:check_plain( @@ -61,7 +104,13 @@ config(Args) -> Parsed#{bridge_name => "testbridge"}. hocon_config(Args) -> - Hocon = bbmustache:render(iolist_to_binary(hocon_config_template()), Args), + AuthConf = maps:get("authentication", Args), + AuthTemplate = iolist_to_binary(hocon_config_template_authentication(AuthConf)), + AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf), + Hocon = bbmustache:render( + iolist_to_binary(hocon_config_template()), + Args#{"authentication" => AuthConfRendered} + ), Hocon. %% erlfmt-ignore @@ -69,7 +118,7 @@ hocon_config_template() -> """ bootstrap_hosts = \"{{ kafka_hosts_string }}\" enable = true -authentication = none +authentication = {{{ authentication }}} producer = { mqtt { topic = \"t/#\" @@ -80,9 +129,24 @@ producer = { } """. +%% erlfmt-ignore +hocon_config_template_authentication("none") -> + "none"; +hocon_config_template_authentication(#{"mechanism" := _}) -> +""" +{ + mechanism = {{ mechanism }} + password = {{ password }} + username = {{ username }} +} +""". + kafka_hosts_string() -> "kafka-1.emqx.net:9092,". +kafka_hosts_string_sasl() -> + "kafka-1.emqx.net:9093,". + kafka_hosts() -> kpro:parse_endpoints(kafka_hosts_string()). diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index c478ce005..45d32767c 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -14,8 +14,8 @@ help() { echo "--suites SUITE1,SUITE2: Comma separated SUITE names to run. e.g. apps/emqx/test/emqx_SUITE.erl" echo "--console: Start EMQX in console mode" echo "--attach: Attach to the Erlang docker container without running any test case" - echo "--only-up: Keep the testbed running after CT" - echo "--keep-up: Only start the testbed but do not run CT" + echo "--only-up: Only start the testbed but do not run CT" + echo "--keep-up: Keep the testbed running after CT" } WHICH_APP='novalue' @@ -151,7 +151,7 @@ elif [ "$CONSOLE" = 'yes' ]; then docker exec -i $TTY "$ERLANG_CONTAINER" bash -c "make run" else set +e - docker exec -i $TTY -e EMQX_CT_SUITES="$SUITES" "$ERLANG_CONTAINER" bash -c "make ${WHICH_APP}-ct" + docker exec -i $TTY -e EMQX_CT_SUITES="$SUITES" "$ERLANG_CONTAINER" bash -c "BUILD_WITHOUT_QUIC=1 make ${WHICH_APP}-ct" RESULT=$? if [ "$KEEP_UP" = 'yes' ]; then exit $RESULT From e45c99bf79dd1ee1bce6a0947531ffa1183565a0 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Mon, 12 Sep 2022 10:51:35 +0200 Subject: [PATCH 03/17] fix: kafka bridge schema --- .../i18n/emqx_ee_bridge_kafka.conf | 8 ++++---- .../emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl | 17 +++++++++++++++-- 2 files changed, 19 insertions(+), 6 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf index a47bf1249..1fdbfedc4 100644 --- a/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf +++ b/lib-ee/emqx_ee_bridge/i18n/emqx_ee_bridge_kafka.conf @@ -219,7 +219,7 @@ emqx_ee_bridge_kafka { socket_nodelay { desc { en: "When set to 'true', TCP buffer sent as soon as possible. " - "Otherwise the OS kernel may buffer small TCP packets for a while (40ms by default)." + "Otherwise, the OS kernel may buffer small TCP packets for a while (40 ms by default)." zh: "设置 ‘true' 让系统内核立即发送。否则当需要发送当内容很少时,可能会有一定延迟(默认 40 毫秒)。" } label { @@ -294,12 +294,12 @@ emqx_ee_bridge_kafka { max_batch_bytes { desc { en: "Maximum bytes to collect in a Kafka message batch. " - "Most of the Kafka brokers default to a limit of 1MB batch size. " - "EMQX's default value is less than 1MB in order to compensate " + "Most of the Kafka brokers default to a limit of 1 MB batch size. " + "EMQX's default value is less than 1 MB in order to compensate " "Kafka message encoding overheads (especially when each individual message is very small). " "When a single message is over the limit, it is still sent (as a single element batch)." zh: "最大消息批量字节数。" - "大多数 Kafka 环境的默认最低值是 1MB,EMQX 的默认值比 1MB 更小是因为需要" + "大多数 Kafka 环境的默认最低值是 1 MB,EMQX 的默认值比 1 MB 更小是因为需要" "补偿 Kafka 消息编码索需要的额外字节(尤其是当每条消息都很小的情况下)。" "当单个消息的大小超过该限制时,它仍然会被发送,(相当于该批量中只有单个消息)。" } diff --git a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl index 080af35d0..ac5177f6e 100644 --- a/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl +++ b/lib-ee/emqx_ee_bridge/src/emqx_ee_bridge_kafka.erl @@ -245,8 +245,21 @@ desc("config") -> ?DESC("desc_config"); desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" -> ["Configuration for Kafka using `", string:to_upper(Method), "` method."]; -desc(_) -> - undefined. +desc(Name) -> + lists:member(Name, struct_names()) orelse throw({missing_desc, Name}), + ?DESC(Name). + +struct_names() -> + [ + auth_gssapi_kerberos, + auth_username_password, + kafka_message, + producer_buffer, + producer_kafka_opts, + producer_mqtt_opts, + socket_opts, + producer_opts + ]. %% ------------------------------------------------------------------------------------------------- %% internal From 5820b028cb263002a7937ad637200b537c909397 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Wed, 14 Sep 2022 15:52:27 +0200 Subject: [PATCH 04/17] feat: add test case for Kerberos Kafka authentication --- .../docker-compose-kafka.yaml | 24 ++++++++++++++++-- .ci/docker-compose-file/docker-compose.yaml | 6 +++++ .ci/docker-compose-file/kafka/jaas.conf | 7 ++++++ .../kafka/run_add_scram_users.sh | 9 +++++++ .ci/docker-compose-file/kerberos/krb5.conf | 23 +++++++++++++++++ .ci/docker-compose-file/kerberos/run.sh | 25 +++++++++++++++++++ .../emqx_bridge_impl_kafka_producer_SUITE.erl | 19 ++++++++++++++ 7 files changed, 111 insertions(+), 2 deletions(-) create mode 100644 .ci/docker-compose-file/kerberos/krb5.conf create mode 100755 .ci/docker-compose-file/kerberos/run.sh diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml index edde553af..a4a064c16 100644 --- a/.ci/docker-compose-file/docker-compose-kafka.yaml +++ b/.ci/docker-compose-file/docker-compose-kafka.yaml @@ -16,6 +16,8 @@ services: - "9093:9093" container_name: kafka-1.emqx.net hostname: kafka-1.emqx.net + depends_on: + - "kdc" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 @@ -23,14 +25,32 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092,SASL_PLAINTEXT://kafka-1.emqx.net:9093 KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_SASL_ENABLED_MECHANISMS: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 + KAFKA_SASL_ENABLED_MECHANISMS: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI + KAFKA_SASL_KERBEROS_SERVICE_NAME: kafka + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN KAFKA_JMX_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas.conf" KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true - KAFKA_CREATE_TOPICS: test-topic-one-partition:1:1,test-topic-two-partitions:2:1,test-topic-three-partitions:3:1, + KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer networks: emqx_bridge: volumes: + - emqx-shared-secret:/var/lib/secret - ./kafka/jaas.conf:/etc/kafka/jaas.conf - ./kafka/run_add_scram_users.sh:/bin/run_add_scram_users.sh + - ./kerberos/krb5.conf:/etc/kdc/krb5.conf + - ./kerberos/krb5.conf:/etc/krb5.conf command: run_add_scram_users.sh + kdc: + hostname: kdc.emqx.net + image: ghcr.io/emqx/emqx-builder/5.0-17:1.13.4-24.2.1-1-ubuntu20.04 + container_name: kdc.emqx.net + networks: + emqx_bridge: + volumes: + - emqx-shared-secret:/var/lib/secret + - ./kerberos/krb5.conf:/etc/kdc/krb5.conf + - ./kerberos/krb5.conf:/etc/krb5.conf + - ./kerberos/run.sh:/usr/bin/run.sh + command: run.sh + diff --git a/.ci/docker-compose-file/docker-compose.yaml b/.ci/docker-compose-file/docker-compose.yaml index 2612eb8d8..8db53d562 100644 --- a/.ci/docker-compose-file/docker-compose.yaml +++ b/.ci/docker-compose-file/docker-compose.yaml @@ -18,6 +18,9 @@ services: - emqx_bridge volumes: - ../..:/emqx + - emqx-shared-secret:/var/lib/secret + - ./kerberos/krb5.conf:/etc/kdc/krb5.conf + - ./kerberos/krb5.conf:/etc/krb5.conf working_dir: /emqx tty: true @@ -33,3 +36,6 @@ networks: gateway: 172.100.239.1 - subnet: 2001:3200:3200::/64 gateway: 2001:3200:3200::1 + +volumes: # add this section + emqx-shared-secret: # does not need anything underneath this diff --git a/.ci/docker-compose-file/kafka/jaas.conf b/.ci/docker-compose-file/kafka/jaas.conf index bf6e6716b..f6158950e 100644 --- a/.ci/docker-compose-file/kafka/jaas.conf +++ b/.ci/docker-compose-file/kafka/jaas.conf @@ -6,4 +6,11 @@ KafkaServer { org.apache.kafka.common.security.scram.ScramLoginModule required username="admin" password="password"; + + com.sun.security.auth.module.Krb5LoginModule required + useKeyTab=true + storeKey=true + keyTab="/var/lib/secret/kafka.key" + principal="kafka/kafka-1.emqx.net@KDC.EMQX.NET"; + }; diff --git a/.ci/docker-compose-file/kafka/run_add_scram_users.sh b/.ci/docker-compose-file/kafka/run_add_scram_users.sh index 3a3d2ee21..1ffb900a8 100755 --- a/.ci/docker-compose-file/kafka/run_add_scram_users.sh +++ b/.ci/docker-compose-file/kafka/run_add_scram_users.sh @@ -2,6 +2,15 @@ set -euo pipefail + +TIMEOUT=60 + +echo "+++++++ Wait until Kerberos Keytab is created ++++++++" + +timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.key ]; do sleep 1; done' + +sleep 3 + echo "+++++++ Starting Kafka ++++++++" start-kafka.sh & diff --git a/.ci/docker-compose-file/kerberos/krb5.conf b/.ci/docker-compose-file/kerberos/krb5.conf new file mode 100644 index 000000000..032236888 --- /dev/null +++ b/.ci/docker-compose-file/kerberos/krb5.conf @@ -0,0 +1,23 @@ +[libdefaults] + default_realm = KDC.EMQX.NET + ticket_lifetime = 24h + renew_lifetime = 7d + forwardable = true + rdns = false + dns_lookup_kdc = no + dns_lookup_realm = no + +[realms] + KDC.EMQX.NET = { + kdc = kdc + admin_server = kadmin + } + +[domain_realm] + kdc.emqx.net = KDC.EMQX.NET + .kdc.emqx.net = KDC.EMQX.NET + +[logging] + kdc = FILE:/var/log/kerberos/krb5kdc.log + admin_server = FILE:/var/log/kerberos/kadmin.log + default = FILE:/var/log/kerberos/krb5lib.log diff --git a/.ci/docker-compose-file/kerberos/run.sh b/.ci/docker-compose-file/kerberos/run.sh new file mode 100755 index 000000000..c5547fb59 --- /dev/null +++ b/.ci/docker-compose-file/kerberos/run.sh @@ -0,0 +1,25 @@ +#!/bin/sh + + +echo "Remove old keytabs" + +rm -f /var/lib/secret/kafka.key 2>&1 > /dev/null +rm -f /var/lib/secret/rig.key 2>&1 > /dev/null + +echo "Create realm" + +kdb5_util -P emqx -r KDC.EMQX.NET create -s + +echo "Add principals" + +kadmin.local -w password -q "add_principal -randkey kafka/kafka-1.emqx.net@KDC.EMQX.NET" +kadmin.local -w password -q "add_principal -randkey rig@KDC.EMQX.NET" > /dev/null + + +echo "Create keytabs" + +kadmin.local -w password -q "ktadd -k /var/lib/secret/kafka.key -norandkey kafka/kafka-1.emqx.net@KDC.EMQX.NET " > /dev/null +kadmin.local -w password -q "ktadd -k /var/lib/secret/rig.key -norandkey rig@KDC.EMQX.NET " > /dev/null + +echo STARTING KDC +/usr/sbin/krb5kdc -n diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 0eb393d4d..a767a18e4 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -94,6 +94,18 @@ t_publish_sasl_scram512(_CtConfig) -> }), do_publish(Conf, KafkaTopic). +t_publish_sasl_kerberos(_CtConfig) -> + KafkaTopic = "test-topic-one-partition", + Conf = config(#{ + "authentication" => #{ + "kerberos_principal" => "rig@KDC.EMQX.NET", + "kerberos_keytab_file" => "/var/lib/secret/rig.key" + }, + "kafka_hosts_string" => kafka_hosts_string_sasl(), + "kafka_topic" => KafkaTopic + }), + do_publish(Conf, KafkaTopic). + config(Args) -> {ok, Conf} = hocon:binary(hocon_config(Args)), #{config := Parsed} = hocon_tconf:check_plain( @@ -139,6 +151,13 @@ hocon_config_template_authentication(#{"mechanism" := _}) -> password = {{ password }} username = {{ username }} } +"""; +hocon_config_template_authentication(#{"kerberos_principal" := _}) -> +""" +{ + kerberos_principal = \"{{ kerberos_principal }}\" + kerberos_keytab_file = \"{{ kerberos_keytab_file }}\" +} """. kafka_hosts_string() -> From 4dc26eeba79711aa792733dc2efa1411cf3a8a90 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 15 Sep 2022 07:33:07 +0200 Subject: [PATCH 05/17] fix: use different instance id in Kafka auth test --- .../test/emqx_bridge_impl_kafka_producer_SUITE.erl | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index a767a18e4..775942015 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -28,8 +28,7 @@ init_per_suite(Config) -> end_per_suite(_) -> ok. -do_publish(Conf, KafkaTopic) -> - InstId = <<"InstanceID">>, +do_publish(Conf, KafkaTopic, InstId) -> Time = erlang:system_time(millisecond), BinTime = integer_to_binary(Time), Msg = #{ @@ -53,7 +52,7 @@ t_publish(_CtConfig) -> "kafka_hosts_string" => kafka_hosts_string(), "kafka_topic" => KafkaTopic }), - do_publish(Conf, KafkaTopic). + do_publish(Conf, KafkaTopic, <<"NoAuthInst">>). t_publish_sasl_plain(_CtConfig) -> KafkaTopic = "test-topic-one-partition", @@ -66,7 +65,7 @@ t_publish_sasl_plain(_CtConfig) -> "kafka_hosts_string" => kafka_hosts_string_sasl(), "kafka_topic" => KafkaTopic }), - do_publish(Conf, KafkaTopic). + do_publish(Conf, KafkaTopic, <<"SASLPlainInst">>). t_publish_sasl_scram256(_CtConfig) -> KafkaTopic = "test-topic-one-partition", @@ -79,7 +78,7 @@ t_publish_sasl_scram256(_CtConfig) -> "kafka_hosts_string" => kafka_hosts_string_sasl(), "kafka_topic" => KafkaTopic }), - do_publish(Conf, KafkaTopic). + do_publish(Conf, KafkaTopic, <<"SASLScram256Inst">>). t_publish_sasl_scram512(_CtConfig) -> KafkaTopic = "test-topic-one-partition", @@ -92,7 +91,7 @@ t_publish_sasl_scram512(_CtConfig) -> "kafka_hosts_string" => kafka_hosts_string_sasl(), "kafka_topic" => KafkaTopic }), - do_publish(Conf, KafkaTopic). + do_publish(Conf, KafkaTopic, <<"SASLScram512Inst">>). t_publish_sasl_kerberos(_CtConfig) -> KafkaTopic = "test-topic-one-partition", @@ -104,7 +103,7 @@ t_publish_sasl_kerberos(_CtConfig) -> "kafka_hosts_string" => kafka_hosts_string_sasl(), "kafka_topic" => KafkaTopic }), - do_publish(Conf, KafkaTopic). + do_publish(Conf, KafkaTopic, <<"SASLKerberosInst">>). config(Args) -> {ok, Conf} = hocon:binary(hocon_config(Args)), From be7a8c11a8227a572e361f3b1cac9a3852a15f6c Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Thu, 15 Sep 2022 16:21:32 +0200 Subject: [PATCH 06/17] test: make bridge name unique in tests --- .../emqx_bridge_impl_kafka_producer_SUITE.erl | 40 +++++++++++++------ 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 775942015..1c79e9bd8 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -38,23 +38,28 @@ do_publish(Conf, KafkaTopic, InstId) -> }, {ok, Offset} = resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0), ct:pal("base offset before testing ~p", [Offset]), - {ok, State} = ?PRODUCER:on_start(InstId, Conf), - ok = ?PRODUCER:on_query(InstId, {send_message, Msg}, State), + StartRes = ?PRODUCER:on_start(InstId, Conf), + {ok, State} = StartRes, + OnQueryRes = ?PRODUCER:on_query(InstId, {send_message, Msg}, State), + ok = OnQueryRes, {ok, {_, [KafkaMsg]}} = brod:fetch(kafka_hosts(), KafkaTopic, 0, Offset), ?assertMatch(#kafka_message{key = BinTime}, KafkaMsg), ok = ?PRODUCER:on_stop(InstId, State), ok. t_publish(_CtConfig) -> + InstId = emqx_bridge_resource:resource_id("kafka", "NoAuthInst"), KafkaTopic = "test-topic-one-partition", Conf = config(#{ "authentication" => "none", "kafka_hosts_string" => kafka_hosts_string(), - "kafka_topic" => KafkaTopic + "kafka_topic" => KafkaTopic, + "instance_id" => InstId }), - do_publish(Conf, KafkaTopic, <<"NoAuthInst">>). + do_publish(Conf, KafkaTopic, InstId). t_publish_sasl_plain(_CtConfig) -> + InstId = emqx_bridge_resource:resource_id("kafka", "SASLPlainInst"), KafkaTopic = "test-topic-one-partition", Conf = config(#{ "authentication" => #{ @@ -63,11 +68,14 @@ t_publish_sasl_plain(_CtConfig) -> "password" => "password" }, "kafka_hosts_string" => kafka_hosts_string_sasl(), - "kafka_topic" => KafkaTopic + "kafka_topic" => KafkaTopic, + "instance_id" => InstId }), - do_publish(Conf, KafkaTopic, <<"SASLPlainInst">>). + do_publish(Conf, KafkaTopic, InstId). t_publish_sasl_scram256(_CtConfig) -> + InstId = emqx_bridge_resource:resource_id("kafka", "SASLScram256Inst"), + KafkaTopic = "test-topic-one-partition", KafkaTopic = "test-topic-one-partition", Conf = config(#{ "authentication" => #{ @@ -76,11 +84,13 @@ t_publish_sasl_scram256(_CtConfig) -> "password" => "password" }, "kafka_hosts_string" => kafka_hosts_string_sasl(), - "kafka_topic" => KafkaTopic + "kafka_topic" => KafkaTopic, + "instance_id" => InstId }), - do_publish(Conf, KafkaTopic, <<"SASLScram256Inst">>). + do_publish(Conf, KafkaTopic, InstId). t_publish_sasl_scram512(_CtConfig) -> + InstId = emqx_bridge_resource:resource_id("kafka", "SASLScram512Inst"), KafkaTopic = "test-topic-one-partition", Conf = config(#{ "authentication" => #{ @@ -89,11 +99,13 @@ t_publish_sasl_scram512(_CtConfig) -> "password" => "password" }, "kafka_hosts_string" => kafka_hosts_string_sasl(), - "kafka_topic" => KafkaTopic + "kafka_topic" => KafkaTopic, + "instance_id" => InstId }), - do_publish(Conf, KafkaTopic, <<"SASLScram512Inst">>). + do_publish(Conf, KafkaTopic, InstId). t_publish_sasl_kerberos(_CtConfig) -> + InstId = emqx_bridge_resource:resource_id("kafka", "SASLKerberosInst"), KafkaTopic = "test-topic-one-partition", Conf = config(#{ "authentication" => #{ @@ -101,9 +113,10 @@ t_publish_sasl_kerberos(_CtConfig) -> "kerberos_keytab_file" => "/var/lib/secret/rig.key" }, "kafka_hosts_string" => kafka_hosts_string_sasl(), - "kafka_topic" => KafkaTopic + "kafka_topic" => KafkaTopic, + "instance_id" => InstId }), - do_publish(Conf, KafkaTopic, <<"SASLKerberosInst">>). + do_publish(Conf, KafkaTopic, InstId). config(Args) -> {ok, Conf} = hocon:binary(hocon_config(Args)), @@ -112,7 +125,8 @@ config(Args) -> #{<<"config">> => Conf}, #{atom_key => true} ), - Parsed#{bridge_name => "testbridge"}. + InstId = maps:get("instance_id", Args), + Parsed#{bridge_name => erlang:element(2, emqx_bridge_resource:parse_bridge_id(InstId))}. hocon_config(Args) -> AuthConf = maps:get("authentication", Args), From ac2922fc4cd3c1f4980508df83d2cb6ab2d1cd70 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 16 Sep 2022 16:44:12 +0200 Subject: [PATCH 07/17] test: Kafka bridge cases for all combinations of SASL and SSL --- .../docker-compose-kafka.yaml | 72 +++--- .../kafka/generate-certs.sh | 45 ++++ .ci/docker-compose-file/kafka/jaas.conf | 2 +- .../kafka/run_add_scram_users.sh | 11 +- .ci/docker-compose-file/kerberos/run.sh | 8 +- .../emqx_bridge_impl_kafka_producer_SUITE.erl | 209 +++++++++++------- 6 files changed, 236 insertions(+), 111 deletions(-) create mode 100755 .ci/docker-compose-file/kafka/generate-certs.sh diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml index a4a064c16..3bb7748d5 100644 --- a/.ci/docker-compose-file/docker-compose-kafka.yaml +++ b/.ci/docker-compose-file/docker-compose-kafka.yaml @@ -9,38 +9,13 @@ services: hostname: zookeeper networks: emqx_bridge: - kafka_1: - image: wurstmeister/kafka:2.13-2.7.0 - ports: - - "9092:9092" - - "9093:9093" - container_name: kafka-1.emqx.net - hostname: kafka-1.emqx.net - depends_on: - - "kdc" - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9093 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092,SASL_PLAINTEXT://kafka-1.emqx.net:9093 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_SASL_ENABLED_MECHANISMS: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI - KAFKA_SASL_KERBEROS_SERVICE_NAME: kafka - KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN - KAFKA_JMX_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas.conf" - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true - KAFKA_CREATE_TOPICS: test-topic-one-partition:1:1,test-topic-two-partitions:2:1,test-topic-three-partitions:3:1, - KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer - networks: - emqx_bridge: + ssl_cert_gen: + image: fredrikhgrelland/alpine-jdk11-openssl + container_name: ssl_cert_gen volumes: - emqx-shared-secret:/var/lib/secret - - ./kafka/jaas.conf:/etc/kafka/jaas.conf - - ./kafka/run_add_scram_users.sh:/bin/run_add_scram_users.sh - - ./kerberos/krb5.conf:/etc/kdc/krb5.conf - - ./kerberos/krb5.conf:/etc/krb5.conf - command: run_add_scram_users.sh + - ./kafka/generate-certs.sh:/bin/generate-certs.sh + command: /bin/generate-certs.sh kdc: hostname: kdc.emqx.net image: ghcr.io/emqx/emqx-builder/5.0-17:1.13.4-24.2.1-1-ubuntu20.04 @@ -53,4 +28,41 @@ services: - ./kerberos/krb5.conf:/etc/krb5.conf - ./kerberos/run.sh:/usr/bin/run.sh command: run.sh + kafka_1: + image: wurstmeister/kafka:2.13-2.7.0 + ports: + - "9092:9092" + - "9093:9093" + container_name: kafka-1.emqx.net + hostname: kafka-1.emqx.net + depends_on: + - "kdc" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9093,SSL://:9094,SASL_SSL://:9095 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092,SASL_PLAINTEXT://kafka-1.emqx.net:9093,SSL://kafka-1.emqx.net:9094,SASL_SSL://kafka-1.emqx.net:9095 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_SASL_ENABLED_MECHANISMS: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI + KAFKA_SASL_KERBEROS_SERVICE_NAME: kafka + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN + KAFKA_JMX_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas.conf" + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true + KAFKA_CREATE_TOPICS: test-topic-one-partition:1:1,test-topic-two-partitions:2:1,test-topic-three-partitions:3:1, + KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer + KAFKA_SSL_TRUSTSTORE_LOCATION: /var/lib/secret/kafka.truststore.jks + KAFKA_SSL_TRUSTSTORE_PASSWORD: password + KAFKA_SSL_KEYSTORE_LOCATION: /var/lib/secret/kafka.keystore.jks + KAFKA_SSL_KEYSTORE_PASSWORD: password + KAFKA_SSL_KEY_PASSWORD: password + networks: + emqx_bridge: + volumes: + - emqx-shared-secret:/var/lib/secret + - ./kafka/jaas.conf:/etc/kafka/jaas.conf + - ./kafka/run_add_scram_users.sh:/bin/run_add_scram_users.sh + - ./kerberos/krb5.conf:/etc/kdc/krb5.conf + - ./kerberos/krb5.conf:/etc/krb5.conf + command: run_add_scram_users.sh diff --git a/.ci/docker-compose-file/kafka/generate-certs.sh b/.ci/docker-compose-file/kafka/generate-certs.sh new file mode 100755 index 000000000..d0ae4a8d0 --- /dev/null +++ b/.ci/docker-compose-file/kafka/generate-certs.sh @@ -0,0 +1,45 @@ +#!/bin/sh + +set -euo pipefail + +set -x + +# Source https://github.com/zmstone/docker-kafka/blob/master/generate-certs.sh + +HOST="*." +DAYS=3650 +PASS="password" + +cd /var/lib/secret/ + +# Delete old files +(rm ca.key ca.crt server.key server.csr server.crt client.key client.csr client.crt server.p12 kafka.keystore.jks kafka.truststore.jks 2>/dev/null || true) + +ls + +echo == Generate self-signed server and client certificates +echo = generate CA +openssl req -new -x509 -keyout ca.key -out ca.crt -days $DAYS -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST" + +echo = generate server certificate request +openssl req -newkey rsa:2048 -sha256 -keyout server.key -out server.csr -days $DAYS -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST" + +echo = sign server certificate +openssl x509 -req -CA ca.crt -CAkey ca.key -in server.csr -out server.crt -days $DAYS -CAcreateserial + +echo = generate client certificate request +openssl req -newkey rsa:2048 -sha256 -keyout client.key -out client.csr -days $DAYS -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST" + +echo == sign client certificate +openssl x509 -req -CA ca.crt -CAkey ca.key -in client.csr -out client.crt -days $DAYS -CAserial ca.srl + +echo = Convert self-signed certificate to PKCS#12 format +openssl pkcs12 -export -name $HOST -in server.crt -inkey server.key -out server.p12 -CAfile ca.crt -passout pass:$PASS + +echo = Import PKCS#12 into a java keystore + +echo $PASS | keytool -importkeystore -destkeystore kafka.keystore.jks -srckeystore server.p12 -srcstoretype pkcs12 -alias $HOST -storepass $PASS + +echo = Import CA into java truststore + +echo yes | keytool -keystore kafka.truststore.jks -alias CARoot -import -file ca.crt -storepass $PASS diff --git a/.ci/docker-compose-file/kafka/jaas.conf b/.ci/docker-compose-file/kafka/jaas.conf index f6158950e..8ffe8457d 100644 --- a/.ci/docker-compose-file/kafka/jaas.conf +++ b/.ci/docker-compose-file/kafka/jaas.conf @@ -10,7 +10,7 @@ KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true - keyTab="/var/lib/secret/kafka.key" + keyTab="/var/lib/secret/kafka.keytab" principal="kafka/kafka-1.emqx.net@KDC.EMQX.NET"; }; diff --git a/.ci/docker-compose-file/kafka/run_add_scram_users.sh b/.ci/docker-compose-file/kafka/run_add_scram_users.sh index 1ffb900a8..32f42a9e9 100755 --- a/.ci/docker-compose-file/kafka/run_add_scram_users.sh +++ b/.ci/docker-compose-file/kafka/run_add_scram_users.sh @@ -5,9 +5,18 @@ set -euo pipefail TIMEOUT=60 +echo "+++++++ Sleep for a while to make sure that old keytab and truststore is deleted ++++++++" + +sleep 5 + echo "+++++++ Wait until Kerberos Keytab is created ++++++++" -timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.key ]; do sleep 1; done' +timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.keytab ]; do sleep 1; done' + + +echo "+++++++ Wait until SSL certs are generated ++++++++" + +timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.truststore.jks ]; do sleep 1; done' sleep 3 diff --git a/.ci/docker-compose-file/kerberos/run.sh b/.ci/docker-compose-file/kerberos/run.sh index c5547fb59..85f172207 100755 --- a/.ci/docker-compose-file/kerberos/run.sh +++ b/.ci/docker-compose-file/kerberos/run.sh @@ -3,8 +3,8 @@ echo "Remove old keytabs" -rm -f /var/lib/secret/kafka.key 2>&1 > /dev/null -rm -f /var/lib/secret/rig.key 2>&1 > /dev/null +rm -f /var/lib/secret/kafka.keytab 2>&1 > /dev/null +rm -f /var/lib/secret/rig.keytab 2>&1 > /dev/null echo "Create realm" @@ -18,8 +18,8 @@ kadmin.local -w password -q "add_principal -randkey rig@KDC.EMQX.NET" > /dev/nu echo "Create keytabs" -kadmin.local -w password -q "ktadd -k /var/lib/secret/kafka.key -norandkey kafka/kafka-1.emqx.net@KDC.EMQX.NET " > /dev/null -kadmin.local -w password -q "ktadd -k /var/lib/secret/rig.key -norandkey rig@KDC.EMQX.NET " > /dev/null +kadmin.local -w password -q "ktadd -k /var/lib/secret/kafka.keytab -norandkey kafka/kafka-1.emqx.net@KDC.EMQX.NET " > /dev/null +kadmin.local -w password -q "ktadd -k /var/lib/secret/rig.keytab -norandkey rig@KDC.EMQX.NET " > /dev/null echo STARTING KDC /usr/sbin/krb5kdc -n diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 1c79e9bd8..9ca87d106 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -20,16 +20,84 @@ all() -> emqx_common_test_helpers:all(?MODULE). +wait_until_kafka_is_up() -> + wait_until_kafka_is_up(0). + +wait_until_kafka_is_up(90) -> + ct:fail("Kafka is not up even though we have waited for a while"); +wait_until_kafka_is_up(Attempts) -> + KafkaTopic = "test-topic-one-partition", + case resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0) of + {ok, _} -> + ok; + _ -> + timer:sleep(1000), + wait_until_kafka_is_up(Attempts + 1) + end. + init_per_suite(Config) -> {ok, _} = application:ensure_all_started(brod), {ok, _} = application:ensure_all_started(wolff), + wait_until_kafka_is_up(), Config. end_per_suite(_) -> ok. -do_publish(Conf, KafkaTopic, InstId) -> - Time = erlang:system_time(millisecond), +t_publish_no_auth(_CtConfig) -> + publish_with_and_without_ssl("none"). + +t_publish_sasl_plain(_CtConfig) -> + publish_with_and_without_ssl(valid_sasl_plain_settings()). + +t_publish_sasl_scram256(_CtConfig) -> + publish_with_and_without_ssl(valid_sasl_scram256_settings()). + +t_publish_sasl_scram512(_CtConfig) -> + publish_with_and_without_ssl(valid_sasl_scram512_settings()). + +t_publish_sasl_kerberos(_CtConfig) -> + publish_with_and_without_ssl(valid_sasl_kerberos_settings()). + +publish_with_and_without_ssl(AuthSettings) -> + publish_helper(#{ + auth_settings => AuthSettings, + ssl_settings => #{} + }), + publish_helper(#{ + auth_settings => AuthSettings, + ssl_settings => valid_ssl_settings() + }). + +publish_helper(#{ + auth_settings := AuthSettings, + ssl_settings := SSLSettings +}) -> + HostsString = + case {AuthSettings, SSLSettings} of + {"none", Map} when map_size(Map) =:= 0 -> + kafka_hosts_string(); + {"none", Map} when map_size(Map) =/= 0 -> + kafka_hosts_string_ssl(); + {_, Map} when map_size(Map) =:= 0 -> + kafka_hosts_string_sasl(); + {_, _} -> + kafka_hosts_string_ssl_sasl() + end, + Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]), + Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), + InstId = emqx_bridge_resource:resource_id("kafka", Name), + KafkaTopic = "test-topic-one-partition", + Conf = config(#{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "instance_id" => InstId, + "ssl" => SSLSettings + }), + %% To make sure we get unique value + timer:sleep(1), + Time = erlang:monotonic_time(), BinTime = integer_to_binary(Time), Msg = #{ clientid => BinTime, @@ -47,79 +115,10 @@ do_publish(Conf, KafkaTopic, InstId) -> ok = ?PRODUCER:on_stop(InstId, State), ok. -t_publish(_CtConfig) -> - InstId = emqx_bridge_resource:resource_id("kafka", "NoAuthInst"), - KafkaTopic = "test-topic-one-partition", - Conf = config(#{ - "authentication" => "none", - "kafka_hosts_string" => kafka_hosts_string(), - "kafka_topic" => KafkaTopic, - "instance_id" => InstId - }), - do_publish(Conf, KafkaTopic, InstId). - -t_publish_sasl_plain(_CtConfig) -> - InstId = emqx_bridge_resource:resource_id("kafka", "SASLPlainInst"), - KafkaTopic = "test-topic-one-partition", - Conf = config(#{ - "authentication" => #{ - "mechanism" => "plain", - "username" => "emqxuser", - "password" => "password" - }, - "kafka_hosts_string" => kafka_hosts_string_sasl(), - "kafka_topic" => KafkaTopic, - "instance_id" => InstId - }), - do_publish(Conf, KafkaTopic, InstId). - -t_publish_sasl_scram256(_CtConfig) -> - InstId = emqx_bridge_resource:resource_id("kafka", "SASLScram256Inst"), - KafkaTopic = "test-topic-one-partition", - KafkaTopic = "test-topic-one-partition", - Conf = config(#{ - "authentication" => #{ - "mechanism" => "scram_sha_256", - "username" => "emqxuser", - "password" => "password" - }, - "kafka_hosts_string" => kafka_hosts_string_sasl(), - "kafka_topic" => KafkaTopic, - "instance_id" => InstId - }), - do_publish(Conf, KafkaTopic, InstId). - -t_publish_sasl_scram512(_CtConfig) -> - InstId = emqx_bridge_resource:resource_id("kafka", "SASLScram512Inst"), - KafkaTopic = "test-topic-one-partition", - Conf = config(#{ - "authentication" => #{ - "mechanism" => "scram_sha_512", - "username" => "emqxuser", - "password" => "password" - }, - "kafka_hosts_string" => kafka_hosts_string_sasl(), - "kafka_topic" => KafkaTopic, - "instance_id" => InstId - }), - do_publish(Conf, KafkaTopic, InstId). - -t_publish_sasl_kerberos(_CtConfig) -> - InstId = emqx_bridge_resource:resource_id("kafka", "SASLKerberosInst"), - KafkaTopic = "test-topic-one-partition", - Conf = config(#{ - "authentication" => #{ - "kerberos_principal" => "rig@KDC.EMQX.NET", - "kerberos_keytab_file" => "/var/lib/secret/rig.key" - }, - "kafka_hosts_string" => kafka_hosts_string_sasl(), - "kafka_topic" => KafkaTopic, - "instance_id" => InstId - }), - do_publish(Conf, KafkaTopic, InstId). - config(Args) -> - {ok, Conf} = hocon:binary(hocon_config(Args)), + ConfText = hocon_config(Args), + ct:pal("Running tests with conf:\n~s", [ConfText]), + {ok, Conf} = hocon:binary(ConfText), #{config := Parsed} = hocon_tconf:check_plain( emqx_ee_bridge_kafka, #{<<"config">> => Conf}, @@ -132,9 +131,15 @@ hocon_config(Args) -> AuthConf = maps:get("authentication", Args), AuthTemplate = iolist_to_binary(hocon_config_template_authentication(AuthConf)), AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf), + SSLConf = maps:get("ssl", Args, #{}), + SSLTemplate = iolist_to_binary(hocon_config_template_ssl(SSLConf)), + SSLConfRendered = bbmustache:render(SSLTemplate, SSLConf), Hocon = bbmustache:render( iolist_to_binary(hocon_config_template()), - Args#{"authentication" => AuthConfRendered} + Args#{ + "authentication" => AuthConfRendered, + "ssl" => SSLConfRendered + } ), Hocon. @@ -144,6 +149,7 @@ hocon_config_template() -> bootstrap_hosts = \"{{ kafka_hosts_string }}\" enable = true authentication = {{{ authentication }}} +ssl = {{{ ssl }}} producer = { mqtt { topic = \"t/#\" @@ -173,12 +179,65 @@ hocon_config_template_authentication(#{"kerberos_principal" := _}) -> } """. +%% erlfmt-ignore +hocon_config_template_ssl(Map) when map_size(Map) =:= 0 -> +""" +{ + enable = false +} +"""; +hocon_config_template_ssl(_) -> +""" +{ + enable = true + cacertfile = \"{{{cacertfile}}}\" + certfile = \"{{{certfile}}}\" + keyfile = \"{{{keyfile}}}\" +} +""". + kafka_hosts_string() -> "kafka-1.emqx.net:9092,". kafka_hosts_string_sasl() -> "kafka-1.emqx.net:9093,". +kafka_hosts_string_ssl() -> + "kafka-1.emqx.net:9094,". + +kafka_hosts_string_ssl_sasl() -> + "kafka-1.emqx.net:9095,". + +valid_ssl_settings() -> + #{ + "cacertfile" => <<"/var/lib/secret/ca.crt">>, + "certfile" => <<"/var/lib/secret/client.crt">>, + "keyfile" => <<"/var/lib/secret/client.key">> + }. + +valid_sasl_plain_settings() -> + #{ + "mechanism" => "plain", + "username" => "emqxuser", + "password" => "password" + }. + +valid_sasl_scram256_settings() -> + (valid_sasl_plain_settings())#{ + "mechanism" => "scram_sha_256" + }. + +valid_sasl_scram512_settings() -> + (valid_sasl_plain_settings())#{ + "mechanism" => "scram_sha_512" + }. + +valid_sasl_kerberos_settings() -> + #{ + "kerberos_principal" => "rig@KDC.EMQX.NET", + "kerberos_keytab_file" => "/var/lib/secret/rig.keytab" + }. + kafka_hosts() -> kpro:parse_endpoints(kafka_hosts_string()). From ba34326010e46f8a240a2bc301c3f6bd46800621 Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 23 Sep 2022 08:58:51 +0200 Subject: [PATCH 08/17] ci(kafka): fix shellcheck errors --- .../kafka/generate-certs.sh | 33 ++++++++++--------- .../kafka/run_add_scram_users.sh | 2 ++ 2 files changed, 19 insertions(+), 16 deletions(-) diff --git a/.ci/docker-compose-file/kafka/generate-certs.sh b/.ci/docker-compose-file/kafka/generate-certs.sh index d0ae4a8d0..3f1c75550 100755 --- a/.ci/docker-compose-file/kafka/generate-certs.sh +++ b/.ci/docker-compose-file/kafka/generate-certs.sh @@ -1,4 +1,4 @@ -#!/bin/sh +#!/usr/bin/bash set -euo pipefail @@ -17,29 +17,30 @@ cd /var/lib/secret/ ls -echo == Generate self-signed server and client certificates -echo = generate CA +echo '== Generate self-signed server and client certificates' +echo '= generate CA' openssl req -new -x509 -keyout ca.key -out ca.crt -days $DAYS -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST" -echo = generate server certificate request -openssl req -newkey rsa:2048 -sha256 -keyout server.key -out server.csr -days $DAYS -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST" +echo '= generate server certificate request' +openssl req -newkey rsa:2048 -sha256 -keyout server.key -out server.csr -days "$DAYS" -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST" -echo = sign server certificate -openssl x509 -req -CA ca.crt -CAkey ca.key -in server.csr -out server.crt -days $DAYS -CAcreateserial +echo '= sign server certificate' +openssl x509 -req -CA ca.crt -CAkey ca.key -in server.csr -out server.crt -days "$DAYS" -CAcreateserial -echo = generate client certificate request -openssl req -newkey rsa:2048 -sha256 -keyout client.key -out client.csr -days $DAYS -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST" +echo '= generate client certificate request' +openssl req -newkey rsa:2048 -sha256 -keyout client.key -out client.csr -days "$DAYS" -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST" -echo == sign client certificate +echo '== sign client certificate' openssl x509 -req -CA ca.crt -CAkey ca.key -in client.csr -out client.crt -days $DAYS -CAserial ca.srl -echo = Convert self-signed certificate to PKCS#12 format -openssl pkcs12 -export -name $HOST -in server.crt -inkey server.key -out server.p12 -CAfile ca.crt -passout pass:$PASS +echo '= Convert self-signed certificate to PKCS#12 format' +openssl pkcs12 -export -name "$HOST" -in server.crt -inkey server.key -out server.p12 -CAfile ca.crt -passout pass:"$PASS" -echo = Import PKCS#12 into a java keystore +echo '= Import PKCS#12 into a java keystore' -echo $PASS | keytool -importkeystore -destkeystore kafka.keystore.jks -srckeystore server.p12 -srcstoretype pkcs12 -alias $HOST -storepass $PASS +echo $PASS | keytool -importkeystore -destkeystore kafka.keystore.jks -srckeystore server.p12 -srcstoretype pkcs12 -alias "$HOST" -storepass "$PASS" -echo = Import CA into java truststore -echo yes | keytool -keystore kafka.truststore.jks -alias CARoot -import -file ca.crt -storepass $PASS +echo '= Import CA into java truststore' + +echo yes | keytool -keystore kafka.truststore.jks -alias CARoot -import -file ca.crt -storepass "$PASS" diff --git a/.ci/docker-compose-file/kafka/run_add_scram_users.sh b/.ci/docker-compose-file/kafka/run_add_scram_users.sh index 32f42a9e9..e997a310c 100755 --- a/.ci/docker-compose-file/kafka/run_add_scram_users.sh +++ b/.ci/docker-compose-file/kafka/run_add_scram_users.sh @@ -31,8 +31,10 @@ TIMEOUT=60 echo "+++++++ Wait until Kafka ports are up ++++++++" +# shellcheck disable=SC2016 timeout $TIMEOUT bash -c 'until printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1 +# shellcheck disable=SC2016 timeout $TIMEOUT bash -c 'until printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT2 echo "+++++++ Run config commands ++++++++" From 516d60c7dad14f7c33cb4f8a990388286816d98d Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 23 Sep 2022 09:00:17 +0200 Subject: [PATCH 09/17] build: fix deps consistency check --- apps/emqx/rebar.config | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/emqx/rebar.config b/apps/emqx/rebar.config index 242f95fcb..8c7635d58 100644 --- a/apps/emqx/rebar.config +++ b/apps/emqx/rebar.config @@ -22,7 +22,7 @@ %% This rebar.config is necessary because the app may be used as a %% `git_subdir` dependency in other projects. {deps, [ - {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.1"}}}, + {lc, {git, "https://github.com/emqx/lc.git", {tag, "0.3.2"}}}, {gproc, {git, "https://github.com/uwiger/gproc", {tag, "0.8.0"}}}, {jiffy, {git, "https://github.com/emqx/jiffy", {tag, "1.0.5"}}}, {cowboy, {git, "https://github.com/emqx/cowboy", {tag, "2.9.0"}}}, From adc67b165b641862a88fc6b5d7090ce288985280 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 23 Sep 2022 10:07:22 +0200 Subject: [PATCH 10/17] test: test cases for Kafka bridge REST API --- apps/emqx_bridge/src/emqx_bridge_api.erl | 2 +- .../emqx_bridge_impl_kafka_producer_SUITE.erl | 321 +++++++++++++++++- 2 files changed, 318 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge_api.erl b/apps/emqx_bridge/src/emqx_bridge_api.erl index a353c9cf0..6bfa439d1 100644 --- a/apps/emqx_bridge/src/emqx_bridge_api.erl +++ b/apps/emqx_bridge/src/emqx_bridge_api.erl @@ -92,7 +92,7 @@ param_path_operation_cluster() -> #{ in => path, required => true, - example => <<"start">>, + example => <<"restart">>, desc => ?DESC("desc_param_path_operation_cluster") } )}. diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 9ca87d106..19ab05cc5 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -13,6 +13,34 @@ -define(PRODUCER, emqx_bridge_impl_kafka). +%%------------------------------------------------------------------------------ +%% Things for REST API tests +%%------------------------------------------------------------------------------ + +-import( + emqx_common_test_http, + [ + request_api/3, + request_api/5, + get_http_data/1 + ] +). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("emqx/include/emqx.hrl"). +-include("emqx_dashboard.hrl"). + +-define(CONTENT_TYPE, "application/x-www-form-urlencoded"). + +-define(HOST, "http://127.0.0.1:18083"). + +%% -define(API_VERSION, "v5"). + +-define(BASE_PATH, "/api/v5"). + +-define(APP_DASHBOARD, emqx_dashboard). +-define(APP_MANAGEMENT, emqx_management). + %%------------------------------------------------------------------------------ %% CT boilerplate %%------------------------------------------------------------------------------ @@ -36,13 +64,89 @@ wait_until_kafka_is_up(Attempts) -> end. init_per_suite(Config) -> - {ok, _} = application:ensure_all_started(brod), - {ok, _} = application:ensure_all_started(wolff), + %% Need to unload emqx_authz. See emqx_machine_SUITE:init_per_suite for + %% more info. + application:unload(emqx_authz), + emqx_common_test_helpers:start_apps( + [emqx_conf, emqx_rule_engine, emqx_bridge, emqx_management, emqx_dashboard], + fun set_special_configs/1 + ), + application:set_env(emqx_machine, applications, [ + emqx_prometheus, + emqx_modules, + emqx_dashboard, + emqx_gateway, + emqx_statsd, + emqx_resource, + emqx_rule_engine, + emqx_bridge, + emqx_ee_bridge, + emqx_plugin_libs, + emqx_management, + emqx_retainer, + emqx_exhook, + emqx_authn, + emqx_authz, + emqx_plugin + ]), + {ok, _} = application:ensure_all_started(emqx_machine), wait_until_kafka_is_up(), + %% Wait until bridges API is up + (fun WaitUntilRestApiUp() -> + case show(http_get(["bridges"])) of + {ok, 200, _Res} -> + ok; + Val -> + ct:pal("REST API for bridges not up. Wait and try again. Response: ~p", [Val]), + timer:sleep(1000), + WaitUntilRestApiUp() + end + end)(), Config. -end_per_suite(_) -> +end_per_suite(Config) -> + emqx_common_test_helpers:stop_apps([ + emqx_prometheus, + emqx_modules, + emqx_dashboard, + emqx_gateway, + emqx_statsd, + emqx_resource, + emqx_rule_engine, + emqx_bridge, + emqx_ee_bridge, + emqx_plugin_libs, + emqx_management, + emqx_retainer, + emqx_exhook, + emqx_authn, + emqx_authz, + emqx_plugin, + emqx_conf, + emqx_bridge, + emqx_management, + emqx_dashboard, + emqx_machine + ]), + mria:stop(), + Config. + +set_special_configs(emqx_management) -> + Listeners = #{http => #{port => 8081}}, + Config = #{ + listeners => Listeners, + applications => [#{id => "admin", secret => "public"}] + }, + emqx_config:put([emqx_management], Config), + ok; +set_special_configs(emqx_dashboard) -> + emqx_dashboard_api_test_helpers:set_default_config(), + ok; +set_special_configs(_) -> ok. +%%------------------------------------------------------------------------------ +%% Test cases for all combinations of SSL, no SSL and authentication types +%%------------------------------------------------------------------------------ t_publish_no_auth(_CtConfig) -> publish_with_and_without_ssl("none"). @@ -59,6 +163,160 @@ t_publish_sasl_scram512(_CtConfig) -> t_publish_sasl_kerberos(_CtConfig) -> publish_with_and_without_ssl(valid_sasl_kerberos_settings()). +%%------------------------------------------------------------------------------ +%% Test cases for REST api +%%------------------------------------------------------------------------------ + +show(X) -> + % erlang:display('______________ SHOW ______________:'), + % erlang:display(X), + X. + +t_kafka_bridge_rest_api_plain_text(_CtConfig) -> + kafka_bridge_rest_api_all_auth_methods(false). + +t_kafka_bridge_rest_api_ssl(_CtConfig) -> + kafka_bridge_rest_api_all_auth_methods(true). + +kafka_bridge_rest_api_all_auth_methods(UseSSL) -> + NormalHostsString = + case UseSSL of + true -> kafka_hosts_string_ssl(); + false -> kafka_hosts_string() + end, + kafka_bridge_rest_api_helper(#{ + <<"bootstrap_hosts">> => NormalHostsString, + <<"authentication">> => <<"none">> + }), + SASLHostsString = + case UseSSL of + true -> kafka_hosts_string_ssl_sasl(); + false -> kafka_hosts_string_sasl() + end, + BinifyMap = fun(Map) -> + maps:from_list([ + {erlang:iolist_to_binary(K), erlang:iolist_to_binary(V)} + || {K, V} <- maps:to_list(Map) + ]) + end, + SSLSettings = + case UseSSL of + true -> #{<<"ssl">> => BinifyMap(valid_ssl_settings())}; + false -> #{} + end, + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => SASLHostsString, + <<"authentication">> => BinifyMap(valid_sasl_plain_settings()) + }, + SSLSettings + ) + ), + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => SASLHostsString, + <<"authentication">> => BinifyMap(valid_sasl_scram256_settings()) + }, + SSLSettings + ) + ), + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => SASLHostsString, + <<"authentication">> => BinifyMap(valid_sasl_scram512_settings()) + }, + SSLSettings + ) + ), + kafka_bridge_rest_api_helper( + maps:merge( + #{ + <<"bootstrap_hosts">> => SASLHostsString, + <<"authentication">> => BinifyMap(valid_sasl_kerberos_settings()) + }, + SSLSettings + ) + ), + ok. + +kafka_bridge_rest_api_helper(Config) -> + UrlEscColon = "%3A", + BridgeIdUrlEnc = "kafka" ++ UrlEscColon ++ "my_kafka_bridge", + BridgesParts = ["bridges"], + BridgesPartsId = ["bridges", BridgeIdUrlEnc], + OpUrlFun = fun(OpName) -> ["bridges", BridgeIdUrlEnc, "operation", OpName] end, + BridgesPartsOpDisable = OpUrlFun("disable"), + BridgesPartsOpEnable = OpUrlFun("enable"), + BridgesPartsOpRestart = OpUrlFun("restart"), + BridgesPartsOpStop = OpUrlFun("stop"), + %% List bridges + MyKafkaBridgeExists = fun() -> + {ok, _Code, BridgesData} = show(http_get(BridgesParts)), + Bridges = show(json(BridgesData)), + lists:any( + fun + (#{<<"name">> := <<"my_kafka_bridge">>}) -> true; + (_) -> false + end, + Bridges + ) + end, + %% Delete if my_kafka_bridge exists + case MyKafkaBridgeExists() of + true -> + %% Delete the bridge my_kafka_bridge + show( + '========================================== DELETE ========================================' + ), + {ok, 204, <<>>} = show(http_delete(BridgesPartsId)); + false -> + ok + end, + false = MyKafkaBridgeExists(), + %% Create new Kafka bridge + CreateBodyTmp = #{ + <<"type">> => <<"kafka">>, + <<"name">> => <<"my_kafka_bridge">>, + <<"bootstrap_hosts">> => maps:get(<<"bootstrap_hosts">>, Config), + <<"enable">> => true, + <<"authentication">> => maps:get(<<"authentication">>, Config), + <<"producer">> => #{ + <<"mqtt">> => #{ + topic => <<"t/#">> + }, + <<"kafka">> => #{ + <<"topic">> => <<"test-topic-one-partition">> + } + } + }, + CreateBody = + case maps:is_key(<<"ssl">>, Config) of + true -> CreateBodyTmp#{<<"ssl">> => maps:get(<<"ssl">>, Config)}; + false -> CreateBodyTmp + end, + {ok, 201, _Data} = show(http_post(BridgesParts, show(CreateBody))), + %% Check that the new bridge is in the list of bridges + true = MyKafkaBridgeExists(), + %% Perform operations + {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), + {ok, 200, _} = show(http_post(show(BridgesPartsOpDisable), #{})), + {ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})), + {ok, 200, _} = show(http_post(show(BridgesPartsOpEnable), #{})), + {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), + {ok, 200, _} = show(http_post(show(BridgesPartsOpStop), #{})), + {ok, 200, _} = show(http_post(show(BridgesPartsOpRestart), #{})), + %% Cleanup + {ok, 204, _} = show(http_delete(BridgesPartsId)), + false = MyKafkaBridgeExists(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper functions +%%------------------------------------------------------------------------------ + publish_with_and_without_ssl(AuthSettings) -> publish_helper(#{ auth_settings => AuthSettings, @@ -212,7 +470,8 @@ valid_ssl_settings() -> #{ "cacertfile" => <<"/var/lib/secret/ca.crt">>, "certfile" => <<"/var/lib/secret/client.crt">>, - "keyfile" => <<"/var/lib/secret/client.key">> + "keyfile" => <<"/var/lib/secret/client.key">>, + "enable" => <<"true">> }. valid_sasl_plain_settings() -> @@ -243,3 +502,57 @@ kafka_hosts() -> resolve_kafka_offset(Hosts, Topic, Partition) -> brod:resolve_offset(Hosts, Topic, Partition, latest). + +%%------------------------------------------------------------------------------ +%% Internal functions rest API helpers +%%------------------------------------------------------------------------------ + +bin(X) -> iolist_to_binary(X). + +random_num() -> + erlang:system_time(nanosecond). + +http_get(Parts) -> + request_api(get, api_path(Parts), auth_header_()). + +http_delete(Parts) -> + request_api(delete, api_path(Parts), auth_header_()). + +http_post(Parts, Body) -> + request_api(post, api_path(Parts), [], auth_header_(), Body). + +http_put(Parts, Body) -> + request_api(put, api_path(Parts), [], auth_header_(), Body). + +request_dashboard(Method, Url, Auth) -> + Request = {Url, [Auth]}, + do_request_dashboard(Method, Request). +request_dashboard(Method, Url, QueryParams, Auth) -> + Request = {Url ++ "?" ++ QueryParams, [Auth]}, + do_request_dashboard(Method, Request). +do_request_dashboard(Method, Request) -> + ct:pal("Method: ~p, Request: ~p", [Method, Request]), + case httpc:request(Method, Request, [], []) of + {error, socket_closed_remotely} -> + {error, socket_closed_remotely}; + {ok, {{"HTTP/1.1", Code, _}, _Headers, Return}} when + Code >= 200 andalso Code =< 299 + -> + {ok, Return}; + {ok, {Reason, _, _}} -> + {error, Reason} + end. + +auth_header_() -> + auth_header_(<<"admin">>, <<"public">>). + +auth_header_(Username, Password) -> + {ok, Token} = emqx_dashboard_admin:sign_token(Username, Password), + {"Authorization", "Bearer " ++ binary_to_list(Token)}. + +api_path(Parts) -> + ?HOST ++ filename:join([?BASE_PATH | Parts]). + +json(Data) -> + {ok, Jsx} = emqx_json:safe_decode(Data, [return_maps]), + Jsx. From 9f3e38aeb03c29175325f2779c70407f72ecf80a Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 23 Sep 2022 10:37:30 +0200 Subject: [PATCH 11/17] test: fix error in script detected by spellcheck script --- .ci/docker-compose-file/kerberos/run.sh | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.ci/docker-compose-file/kerberos/run.sh b/.ci/docker-compose-file/kerberos/run.sh index 85f172207..c9580073f 100755 --- a/.ci/docker-compose-file/kerberos/run.sh +++ b/.ci/docker-compose-file/kerberos/run.sh @@ -3,8 +3,8 @@ echo "Remove old keytabs" -rm -f /var/lib/secret/kafka.keytab 2>&1 > /dev/null -rm -f /var/lib/secret/rig.keytab 2>&1 > /dev/null +rm -f /var/lib/secret/kafka.keytab > /dev/null 2>&1 +rm -f /var/lib/secret/rig.keytab > /dev/null 2>&1 echo "Create realm" From c4f7d385b57a90c4eadd65135163332eb7d09c39 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 23 Sep 2022 11:47:12 +0200 Subject: [PATCH 12/17] test: fix true not allowed as docker compose env var value --- .ci/docker-compose-file/docker-compose-kafka.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml index 3bb7748d5..d610316bd 100644 --- a/.ci/docker-compose-file/docker-compose-kafka.yaml +++ b/.ci/docker-compose-file/docker-compose-kafka.yaml @@ -48,7 +48,7 @@ services: KAFKA_SASL_KERBEROS_SERVICE_NAME: kafka KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN KAFKA_JMX_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas.conf" - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: "true" KAFKA_CREATE_TOPICS: test-topic-one-partition:1:1,test-topic-two-partitions:2:1,test-topic-three-partitions:3:1, KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer KAFKA_SSL_TRUSTSTORE_LOCATION: /var/lib/secret/kafka.truststore.jks From 7b601bf970328ef7bf7704754e8eeb039b8e0e4a Mon Sep 17 00:00:00 2001 From: "Zaiming (Stone) Shi" Date: Fri, 23 Sep 2022 11:52:12 +0200 Subject: [PATCH 13/17] chore: delete bad parse_bridge function clause --- apps/emqx_bridge/src/emqx_bridge.erl | 2 -- apps/emqx_bridge/src/emqx_bridge_resource.erl | 2 -- .../test/emqx_bridge_impl_kafka_producer_SUITE.erl | 3 ++- 3 files changed, 2 insertions(+), 5 deletions(-) diff --git a/apps/emqx_bridge/src/emqx_bridge.erl b/apps/emqx_bridge/src/emqx_bridge.erl index ceca5ea7f..d4d24ef3a 100644 --- a/apps/emqx_bridge/src/emqx_bridge.erl +++ b/apps/emqx_bridge/src/emqx_bridge.erl @@ -334,8 +334,6 @@ get_matched_bridges(Topic) -> Bridges ). -%% TODO: refactor to return bridge type, and bridge name directly -%% so there is no need to parse the id back to type and name at where it is used get_matched_bridge_id(_BType, #{enable := false}, _Topic, _BName, Acc) -> Acc; get_matched_bridge_id(BType, #{local_topic := Filter}, Topic, BName, Acc) when diff --git a/apps/emqx_bridge/src/emqx_bridge_resource.erl b/apps/emqx_bridge/src/emqx_bridge_resource.erl index b6fd2e7be..2894ec461 100644 --- a/apps/emqx_bridge/src/emqx_bridge_resource.erl +++ b/apps/emqx_bridge/src/emqx_bridge_resource.erl @@ -71,8 +71,6 @@ bridge_id(BridgeType, BridgeName) -> Type = bin(BridgeType), <>. -parse_bridge_id(<<"bridge:", BridgeId/binary>>) -> - parse_bridge_id(BridgeId); parse_bridge_id(BridgeId) -> case string:split(bin(BridgeId), ":", all) of [Type, Name] -> {binary_to_atom(Type, utf8), binary_to_atom(Name, utf8)}; diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 19ab05cc5..90ddd6933 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -383,7 +383,8 @@ config(Args) -> #{atom_key => true} ), InstId = maps:get("instance_id", Args), - Parsed#{bridge_name => erlang:element(2, emqx_bridge_resource:parse_bridge_id(InstId))}. + <<"bridge:", BridgeId>> = InstId, + Parsed#{bridge_name => erlang:element(2, emqx_bridge_resource:parse_bridge_id(BridgeId))}. hocon_config(Args) -> AuthConf = maps:get("authentication", Args), From 8e514680d8fac1c3fc5eab7899a5f677ff891c30 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 23 Sep 2022 13:54:11 +0200 Subject: [PATCH 14/17] test: fix bad binary pattern --- .../test/emqx_bridge_impl_kafka_producer_SUITE.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 90ddd6933..5e9dbfc73 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -383,7 +383,7 @@ config(Args) -> #{atom_key => true} ), InstId = maps:get("instance_id", Args), - <<"bridge:", BridgeId>> = InstId, + <<"bridge:", BridgeId/binary>> = InstId, Parsed#{bridge_name => erlang:element(2, emqx_bridge_resource:parse_bridge_id(BridgeId))}. hocon_config(Args) -> From a3c88b40a02fc4ae7caa9f8cd825231fb1502e20 Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 23 Sep 2022 14:33:41 +0200 Subject: [PATCH 15/17] test: changes to make Kafka container run in GitHub action --- .ci/docker-compose-file/docker-compose-kafka.yaml | 4 ++++ .../test/emqx_bridge_impl_kafka_producer_SUITE.erl | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml index d610316bd..d58f51146 100644 --- a/.ci/docker-compose-file/docker-compose-kafka.yaml +++ b/.ci/docker-compose-file/docker-compose-kafka.yaml @@ -33,10 +33,14 @@ services: ports: - "9092:9092" - "9093:9093" + - "9094:9094" + - "9095:9095" container_name: kafka-1.emqx.net hostname: kafka-1.emqx.net depends_on: - "kdc" + - "zookeeper" + - "ssl_cert_gen" environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 5e9dbfc73..fb929e692 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -51,7 +51,7 @@ all() -> wait_until_kafka_is_up() -> wait_until_kafka_is_up(0). -wait_until_kafka_is_up(90) -> +wait_until_kafka_is_up(300) -> ct:fail("Kafka is not up even though we have waited for a while"); wait_until_kafka_is_up(Attempts) -> KafkaTopic = "test-topic-one-partition", From ac37c5d58aca781f800023d60098f8e30f5347ed Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 23 Sep 2022 15:02:01 +0200 Subject: [PATCH 16/17] test: github actions debug printouts --- .ci/docker-compose-file/kafka/run_add_scram_users.sh | 3 +++ scripts/ct/run.sh | 6 ++++++ 2 files changed, 9 insertions(+) diff --git a/.ci/docker-compose-file/kafka/run_add_scram_users.sh b/.ci/docker-compose-file/kafka/run_add_scram_users.sh index e997a310c..4b51fee0d 100755 --- a/.ci/docker-compose-file/kafka/run_add_scram_users.sh +++ b/.ci/docker-compose-file/kafka/run_add_scram_users.sh @@ -44,3 +44,6 @@ kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-S echo "+++++++ Wait until Kafka ports are down ++++++++" bash -c 'while printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1 + +echo "+++++++ Kafka ports are down ++++++++" + diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 45d32767c..99af6d098 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -145,6 +145,12 @@ if [ "$ONLY_UP" = 'yes' ]; then exit 0 fi +sleep 10 + +echo "DOCKER COMPOSE LOGS kafka_1" + +docker-compose $F_OPTIONS logs kafka_1 + if [ "$ATTACH" = 'yes' ]; then docker exec -it "$ERLANG_CONTAINER" bash elif [ "$CONSOLE" = 'yes' ]; then From 5ec4b0a6ca975c4ff8f71a523efcc386aefd961d Mon Sep 17 00:00:00 2001 From: Kjell Winblad Date: Fri, 23 Sep 2022 15:58:22 +0200 Subject: [PATCH 17/17] fix: fix entrypoint in docker compose for Kafka bridge test --- .ci/docker-compose-file/docker-compose-kafka.yaml | 1 + scripts/ct/run.sh | 6 ------ 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml index d58f51146..ba0161293 100644 --- a/.ci/docker-compose-file/docker-compose-kafka.yaml +++ b/.ci/docker-compose-file/docker-compose-kafka.yaml @@ -15,6 +15,7 @@ services: volumes: - emqx-shared-secret:/var/lib/secret - ./kafka/generate-certs.sh:/bin/generate-certs.sh + entrypoint: /bin/sh command: /bin/generate-certs.sh kdc: hostname: kdc.emqx.net diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 99af6d098..45d32767c 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -145,12 +145,6 @@ if [ "$ONLY_UP" = 'yes' ]; then exit 0 fi -sleep 10 - -echo "DOCKER COMPOSE LOGS kafka_1" - -docker-compose $F_OPTIONS logs kafka_1 - if [ "$ATTACH" = 'yes' ]; then docker exec -it "$ERLANG_CONTAINER" bash elif [ "$CONSOLE" = 'yes' ]; then