diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml index 85532725d..edde553af 100644 --- a/.ci/docker-compose-file/docker-compose-kafka.yaml +++ b/.ci/docker-compose-file/docker-compose-kafka.yaml @@ -13,15 +13,24 @@ services: image: wurstmeister/kafka:2.13-2.7.0 ports: - "9092:9092" + - "9093:9093" container_name: kafka-1.emqx.net hostname: kafka-1.emqx.net environment: KAFKA_BROKER_ID: 1 KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: PLAINTEXT://:9092 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT + KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9093 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092,SASL_PLAINTEXT://kafka-1.emqx.net:9093 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_SASL_ENABLED_MECHANISMS: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512 + KAFKA_JMX_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas.conf" + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true + KAFKA_CREATE_TOPICS: test-topic-one-partition:1:1,test-topic-two-partitions:2:1,test-topic-three-partitions:3:1, networks: emqx_bridge: + volumes: + - ./kafka/jaas.conf:/etc/kafka/jaas.conf + - ./kafka/run_add_scram_users.sh:/bin/run_add_scram_users.sh + command: run_add_scram_users.sh diff --git a/.ci/docker-compose-file/kafka/jaas.conf b/.ci/docker-compose-file/kafka/jaas.conf new file mode 100644 index 000000000..bf6e6716b --- /dev/null +++ b/.ci/docker-compose-file/kafka/jaas.conf @@ -0,0 +1,9 @@ +KafkaServer { + org.apache.kafka.common.security.plain.PlainLoginModule required + user_admin="password" + user_emqxuser="password"; + + org.apache.kafka.common.security.scram.ScramLoginModule required + username="admin" + password="password"; +}; diff --git a/.ci/docker-compose-file/kafka/run_add_scram_users.sh b/.ci/docker-compose-file/kafka/run_add_scram_users.sh new file mode 100755 index 000000000..3a3d2ee21 --- /dev/null +++ b/.ci/docker-compose-file/kafka/run_add_scram_users.sh @@ -0,0 +1,26 @@ +#!/usr/bin/env bash + +set -euo pipefail + +echo "+++++++ Starting Kafka ++++++++" + +start-kafka.sh & + +SERVER=localhost +PORT1=9092 +PORT2=9093 +TIMEOUT=60 + +echo "+++++++ Wait until Kafka ports are up ++++++++" + +timeout $TIMEOUT bash -c 'until printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1 + +timeout $TIMEOUT bash -c 'until printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT2 + +echo "+++++++ Run config commands ++++++++" + +kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'SCRAM-SHA-256=[iterations=8192,password=password],SCRAM-SHA-512=[password=password]' --entity-type users --entity-name emqxuser + +echo "+++++++ Wait until Kafka ports are down ++++++++" + +bash -c 'while printf "" 2>>/dev/null >>/dev/tcp/$0/$1; do sleep 1; done' $SERVER $PORT1 diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 8d01d0d69..0eb393d4d 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -28,12 +28,7 @@ init_per_suite(Config) -> end_per_suite(_) -> ok. -t_publish(_CtConfig) -> - KafkaTopic = "test-topic-one-partition", - Conf = config(#{ - "kafka_hosts_string" => kafka_hosts_string(), - "kafka_topic" => KafkaTopic - }), +do_publish(Conf, KafkaTopic) -> InstId = <<"InstanceID">>, Time = erlang:system_time(millisecond), BinTime = integer_to_binary(Time), @@ -51,6 +46,54 @@ t_publish(_CtConfig) -> ok = ?PRODUCER:on_stop(InstId, State), ok. +t_publish(_CtConfig) -> + KafkaTopic = "test-topic-one-partition", + Conf = config(#{ + "authentication" => "none", + "kafka_hosts_string" => kafka_hosts_string(), + "kafka_topic" => KafkaTopic + }), + do_publish(Conf, KafkaTopic). + +t_publish_sasl_plain(_CtConfig) -> + KafkaTopic = "test-topic-one-partition", + Conf = config(#{ + "authentication" => #{ + "mechanism" => "plain", + "username" => "emqxuser", + "password" => "password" + }, + "kafka_hosts_string" => kafka_hosts_string_sasl(), + "kafka_topic" => KafkaTopic + }), + do_publish(Conf, KafkaTopic). + +t_publish_sasl_scram256(_CtConfig) -> + KafkaTopic = "test-topic-one-partition", + Conf = config(#{ + "authentication" => #{ + "mechanism" => "scram_sha_256", + "username" => "emqxuser", + "password" => "password" + }, + "kafka_hosts_string" => kafka_hosts_string_sasl(), + "kafka_topic" => KafkaTopic + }), + do_publish(Conf, KafkaTopic). + +t_publish_sasl_scram512(_CtConfig) -> + KafkaTopic = "test-topic-one-partition", + Conf = config(#{ + "authentication" => #{ + "mechanism" => "scram_sha_512", + "username" => "emqxuser", + "password" => "password" + }, + "kafka_hosts_string" => kafka_hosts_string_sasl(), + "kafka_topic" => KafkaTopic + }), + do_publish(Conf, KafkaTopic). + config(Args) -> {ok, Conf} = hocon:binary(hocon_config(Args)), #{config := Parsed} = hocon_tconf:check_plain( @@ -61,7 +104,13 @@ config(Args) -> Parsed#{bridge_name => "testbridge"}. hocon_config(Args) -> - Hocon = bbmustache:render(iolist_to_binary(hocon_config_template()), Args), + AuthConf = maps:get("authentication", Args), + AuthTemplate = iolist_to_binary(hocon_config_template_authentication(AuthConf)), + AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf), + Hocon = bbmustache:render( + iolist_to_binary(hocon_config_template()), + Args#{"authentication" => AuthConfRendered} + ), Hocon. %% erlfmt-ignore @@ -69,7 +118,7 @@ hocon_config_template() -> """ bootstrap_hosts = \"{{ kafka_hosts_string }}\" enable = true -authentication = none +authentication = {{{ authentication }}} producer = { mqtt { topic = \"t/#\" @@ -80,9 +129,24 @@ producer = { } """. +%% erlfmt-ignore +hocon_config_template_authentication("none") -> + "none"; +hocon_config_template_authentication(#{"mechanism" := _}) -> +""" +{ + mechanism = {{ mechanism }} + password = {{ password }} + username = {{ username }} +} +""". + kafka_hosts_string() -> "kafka-1.emqx.net:9092,". +kafka_hosts_string_sasl() -> + "kafka-1.emqx.net:9093,". + kafka_hosts() -> kpro:parse_endpoints(kafka_hosts_string()). diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index c478ce005..45d32767c 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -14,8 +14,8 @@ help() { echo "--suites SUITE1,SUITE2: Comma separated SUITE names to run. e.g. apps/emqx/test/emqx_SUITE.erl" echo "--console: Start EMQX in console mode" echo "--attach: Attach to the Erlang docker container without running any test case" - echo "--only-up: Keep the testbed running after CT" - echo "--keep-up: Only start the testbed but do not run CT" + echo "--only-up: Only start the testbed but do not run CT" + echo "--keep-up: Keep the testbed running after CT" } WHICH_APP='novalue' @@ -151,7 +151,7 @@ elif [ "$CONSOLE" = 'yes' ]; then docker exec -i $TTY "$ERLANG_CONTAINER" bash -c "make run" else set +e - docker exec -i $TTY -e EMQX_CT_SUITES="$SUITES" "$ERLANG_CONTAINER" bash -c "make ${WHICH_APP}-ct" + docker exec -i $TTY -e EMQX_CT_SUITES="$SUITES" "$ERLANG_CONTAINER" bash -c "BUILD_WITHOUT_QUIC=1 make ${WHICH_APP}-ct" RESULT=$? if [ "$KEEP_UP" = 'yes' ]; then exit $RESULT