diff --git a/.ci/docker-compose-file/docker-compose-kafka.yaml b/.ci/docker-compose-file/docker-compose-kafka.yaml index a4a064c16..3bb7748d5 100644 --- a/.ci/docker-compose-file/docker-compose-kafka.yaml +++ b/.ci/docker-compose-file/docker-compose-kafka.yaml @@ -9,38 +9,13 @@ services: hostname: zookeeper networks: emqx_bridge: - kafka_1: - image: wurstmeister/kafka:2.13-2.7.0 - ports: - - "9092:9092" - - "9093:9093" - container_name: kafka-1.emqx.net - hostname: kafka-1.emqx.net - depends_on: - - "kdc" - environment: - KAFKA_BROKER_ID: 1 - KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 - KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9093 - KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092,SASL_PLAINTEXT://kafka-1.emqx.net:9093 - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT - KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT - KAFKA_SASL_ENABLED_MECHANISMS: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI - KAFKA_SASL_KERBEROS_SERVICE_NAME: kafka - KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN - KAFKA_JMX_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas.conf" - KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true - KAFKA_CREATE_TOPICS: test-topic-one-partition:1:1,test-topic-two-partitions:2:1,test-topic-three-partitions:3:1, - KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer - networks: - emqx_bridge: + ssl_cert_gen: + image: fredrikhgrelland/alpine-jdk11-openssl + container_name: ssl_cert_gen volumes: - emqx-shared-secret:/var/lib/secret - - ./kafka/jaas.conf:/etc/kafka/jaas.conf - - ./kafka/run_add_scram_users.sh:/bin/run_add_scram_users.sh - - ./kerberos/krb5.conf:/etc/kdc/krb5.conf - - ./kerberos/krb5.conf:/etc/krb5.conf - command: run_add_scram_users.sh + - ./kafka/generate-certs.sh:/bin/generate-certs.sh + command: /bin/generate-certs.sh kdc: hostname: kdc.emqx.net image: ghcr.io/emqx/emqx-builder/5.0-17:1.13.4-24.2.1-1-ubuntu20.04 @@ -53,4 +28,41 @@ services: - ./kerberos/krb5.conf:/etc/krb5.conf - ./kerberos/run.sh:/usr/bin/run.sh command: run.sh + kafka_1: + image: wurstmeister/kafka:2.13-2.7.0 + ports: + - "9092:9092" + - "9093:9093" + container_name: kafka-1.emqx.net + hostname: kafka-1.emqx.net + depends_on: + - "kdc" + environment: + KAFKA_BROKER_ID: 1 + KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_LISTENERS: PLAINTEXT://:9092,SASL_PLAINTEXT://:9093,SSL://:9094,SASL_SSL://:9095 + KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-1.emqx.net:9092,SASL_PLAINTEXT://kafka-1.emqx.net:9093,SSL://kafka-1.emqx.net:9094,SASL_SSL://kafka-1.emqx.net:9095 + KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,SASL_PLAINTEXT:SASL_PLAINTEXT,SSL:SSL,SASL_SSL:SASL_SSL + KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT + KAFKA_SASL_ENABLED_MECHANISMS: PLAIN,SCRAM-SHA-256,SCRAM-SHA-512,GSSAPI + KAFKA_SASL_KERBEROS_SERVICE_NAME: kafka + KAFKA_SASL_MECHANISM_INTER_BROKER_PROTOCOL: PLAIN + KAFKA_JMX_OPTS: "-Djava.security.auth.login.config=/etc/kafka/jaas.conf" + KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND: true + KAFKA_CREATE_TOPICS: test-topic-one-partition:1:1,test-topic-two-partitions:2:1,test-topic-three-partitions:3:1, + KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.auth.SimpleAclAuthorizer + KAFKA_SSL_TRUSTSTORE_LOCATION: /var/lib/secret/kafka.truststore.jks + KAFKA_SSL_TRUSTSTORE_PASSWORD: password + KAFKA_SSL_KEYSTORE_LOCATION: /var/lib/secret/kafka.keystore.jks + KAFKA_SSL_KEYSTORE_PASSWORD: password + KAFKA_SSL_KEY_PASSWORD: password + networks: + emqx_bridge: + volumes: + - emqx-shared-secret:/var/lib/secret + - ./kafka/jaas.conf:/etc/kafka/jaas.conf + - ./kafka/run_add_scram_users.sh:/bin/run_add_scram_users.sh + - ./kerberos/krb5.conf:/etc/kdc/krb5.conf + - ./kerberos/krb5.conf:/etc/krb5.conf + command: run_add_scram_users.sh diff --git a/.ci/docker-compose-file/kafka/generate-certs.sh b/.ci/docker-compose-file/kafka/generate-certs.sh new file mode 100755 index 000000000..d0ae4a8d0 --- /dev/null +++ b/.ci/docker-compose-file/kafka/generate-certs.sh @@ -0,0 +1,45 @@ +#!/bin/sh + +set -euo pipefail + +set -x + +# Source https://github.com/zmstone/docker-kafka/blob/master/generate-certs.sh + +HOST="*." +DAYS=3650 +PASS="password" + +cd /var/lib/secret/ + +# Delete old files +(rm ca.key ca.crt server.key server.csr server.crt client.key client.csr client.crt server.p12 kafka.keystore.jks kafka.truststore.jks 2>/dev/null || true) + +ls + +echo == Generate self-signed server and client certificates +echo = generate CA +openssl req -new -x509 -keyout ca.key -out ca.crt -days $DAYS -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST" + +echo = generate server certificate request +openssl req -newkey rsa:2048 -sha256 -keyout server.key -out server.csr -days $DAYS -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST" + +echo = sign server certificate +openssl x509 -req -CA ca.crt -CAkey ca.key -in server.csr -out server.crt -days $DAYS -CAcreateserial + +echo = generate client certificate request +openssl req -newkey rsa:2048 -sha256 -keyout client.key -out client.csr -days $DAYS -nodes -subj "/C=SE/ST=Stockholm/L=Stockholm/O=brod/OU=test/CN=$HOST" + +echo == sign client certificate +openssl x509 -req -CA ca.crt -CAkey ca.key -in client.csr -out client.crt -days $DAYS -CAserial ca.srl + +echo = Convert self-signed certificate to PKCS#12 format +openssl pkcs12 -export -name $HOST -in server.crt -inkey server.key -out server.p12 -CAfile ca.crt -passout pass:$PASS + +echo = Import PKCS#12 into a java keystore + +echo $PASS | keytool -importkeystore -destkeystore kafka.keystore.jks -srckeystore server.p12 -srcstoretype pkcs12 -alias $HOST -storepass $PASS + +echo = Import CA into java truststore + +echo yes | keytool -keystore kafka.truststore.jks -alias CARoot -import -file ca.crt -storepass $PASS diff --git a/.ci/docker-compose-file/kafka/jaas.conf b/.ci/docker-compose-file/kafka/jaas.conf index f6158950e..8ffe8457d 100644 --- a/.ci/docker-compose-file/kafka/jaas.conf +++ b/.ci/docker-compose-file/kafka/jaas.conf @@ -10,7 +10,7 @@ KafkaServer { com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true - keyTab="/var/lib/secret/kafka.key" + keyTab="/var/lib/secret/kafka.keytab" principal="kafka/kafka-1.emqx.net@KDC.EMQX.NET"; }; diff --git a/.ci/docker-compose-file/kafka/run_add_scram_users.sh b/.ci/docker-compose-file/kafka/run_add_scram_users.sh index 1ffb900a8..32f42a9e9 100755 --- a/.ci/docker-compose-file/kafka/run_add_scram_users.sh +++ b/.ci/docker-compose-file/kafka/run_add_scram_users.sh @@ -5,9 +5,18 @@ set -euo pipefail TIMEOUT=60 +echo "+++++++ Sleep for a while to make sure that old keytab and truststore is deleted ++++++++" + +sleep 5 + echo "+++++++ Wait until Kerberos Keytab is created ++++++++" -timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.key ]; do sleep 1; done' +timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.keytab ]; do sleep 1; done' + + +echo "+++++++ Wait until SSL certs are generated ++++++++" + +timeout $TIMEOUT bash -c 'until [ -f /var/lib/secret/kafka.truststore.jks ]; do sleep 1; done' sleep 3 diff --git a/.ci/docker-compose-file/kerberos/run.sh b/.ci/docker-compose-file/kerberos/run.sh index c5547fb59..85f172207 100755 --- a/.ci/docker-compose-file/kerberos/run.sh +++ b/.ci/docker-compose-file/kerberos/run.sh @@ -3,8 +3,8 @@ echo "Remove old keytabs" -rm -f /var/lib/secret/kafka.key 2>&1 > /dev/null -rm -f /var/lib/secret/rig.key 2>&1 > /dev/null +rm -f /var/lib/secret/kafka.keytab 2>&1 > /dev/null +rm -f /var/lib/secret/rig.keytab 2>&1 > /dev/null echo "Create realm" @@ -18,8 +18,8 @@ kadmin.local -w password -q "add_principal -randkey rig@KDC.EMQX.NET" > /dev/nu echo "Create keytabs" -kadmin.local -w password -q "ktadd -k /var/lib/secret/kafka.key -norandkey kafka/kafka-1.emqx.net@KDC.EMQX.NET " > /dev/null -kadmin.local -w password -q "ktadd -k /var/lib/secret/rig.key -norandkey rig@KDC.EMQX.NET " > /dev/null +kadmin.local -w password -q "ktadd -k /var/lib/secret/kafka.keytab -norandkey kafka/kafka-1.emqx.net@KDC.EMQX.NET " > /dev/null +kadmin.local -w password -q "ktadd -k /var/lib/secret/rig.keytab -norandkey rig@KDC.EMQX.NET " > /dev/null echo STARTING KDC /usr/sbin/krb5kdc -n diff --git a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl index 1c79e9bd8..9ca87d106 100644 --- a/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl +++ b/lib-ee/emqx_ee_bridge/test/emqx_bridge_impl_kafka_producer_SUITE.erl @@ -20,16 +20,84 @@ all() -> emqx_common_test_helpers:all(?MODULE). +wait_until_kafka_is_up() -> + wait_until_kafka_is_up(0). + +wait_until_kafka_is_up(90) -> + ct:fail("Kafka is not up even though we have waited for a while"); +wait_until_kafka_is_up(Attempts) -> + KafkaTopic = "test-topic-one-partition", + case resolve_kafka_offset(kafka_hosts(), KafkaTopic, 0) of + {ok, _} -> + ok; + _ -> + timer:sleep(1000), + wait_until_kafka_is_up(Attempts + 1) + end. + init_per_suite(Config) -> {ok, _} = application:ensure_all_started(brod), {ok, _} = application:ensure_all_started(wolff), + wait_until_kafka_is_up(), Config. end_per_suite(_) -> ok. -do_publish(Conf, KafkaTopic, InstId) -> - Time = erlang:system_time(millisecond), +t_publish_no_auth(_CtConfig) -> + publish_with_and_without_ssl("none"). + +t_publish_sasl_plain(_CtConfig) -> + publish_with_and_without_ssl(valid_sasl_plain_settings()). + +t_publish_sasl_scram256(_CtConfig) -> + publish_with_and_without_ssl(valid_sasl_scram256_settings()). + +t_publish_sasl_scram512(_CtConfig) -> + publish_with_and_without_ssl(valid_sasl_scram512_settings()). + +t_publish_sasl_kerberos(_CtConfig) -> + publish_with_and_without_ssl(valid_sasl_kerberos_settings()). + +publish_with_and_without_ssl(AuthSettings) -> + publish_helper(#{ + auth_settings => AuthSettings, + ssl_settings => #{} + }), + publish_helper(#{ + auth_settings => AuthSettings, + ssl_settings => valid_ssl_settings() + }). + +publish_helper(#{ + auth_settings := AuthSettings, + ssl_settings := SSLSettings +}) -> + HostsString = + case {AuthSettings, SSLSettings} of + {"none", Map} when map_size(Map) =:= 0 -> + kafka_hosts_string(); + {"none", Map} when map_size(Map) =/= 0 -> + kafka_hosts_string_ssl(); + {_, Map} when map_size(Map) =:= 0 -> + kafka_hosts_string_sasl(); + {_, _} -> + kafka_hosts_string_ssl_sasl() + end, + Hash = erlang:phash2([HostsString, AuthSettings, SSLSettings]), + Name = "kafka_bridge_name_" ++ erlang:integer_to_list(Hash), + InstId = emqx_bridge_resource:resource_id("kafka", Name), + KafkaTopic = "test-topic-one-partition", + Conf = config(#{ + "authentication" => AuthSettings, + "kafka_hosts_string" => HostsString, + "kafka_topic" => KafkaTopic, + "instance_id" => InstId, + "ssl" => SSLSettings + }), + %% To make sure we get unique value + timer:sleep(1), + Time = erlang:monotonic_time(), BinTime = integer_to_binary(Time), Msg = #{ clientid => BinTime, @@ -47,79 +115,10 @@ do_publish(Conf, KafkaTopic, InstId) -> ok = ?PRODUCER:on_stop(InstId, State), ok. -t_publish(_CtConfig) -> - InstId = emqx_bridge_resource:resource_id("kafka", "NoAuthInst"), - KafkaTopic = "test-topic-one-partition", - Conf = config(#{ - "authentication" => "none", - "kafka_hosts_string" => kafka_hosts_string(), - "kafka_topic" => KafkaTopic, - "instance_id" => InstId - }), - do_publish(Conf, KafkaTopic, InstId). - -t_publish_sasl_plain(_CtConfig) -> - InstId = emqx_bridge_resource:resource_id("kafka", "SASLPlainInst"), - KafkaTopic = "test-topic-one-partition", - Conf = config(#{ - "authentication" => #{ - "mechanism" => "plain", - "username" => "emqxuser", - "password" => "password" - }, - "kafka_hosts_string" => kafka_hosts_string_sasl(), - "kafka_topic" => KafkaTopic, - "instance_id" => InstId - }), - do_publish(Conf, KafkaTopic, InstId). - -t_publish_sasl_scram256(_CtConfig) -> - InstId = emqx_bridge_resource:resource_id("kafka", "SASLScram256Inst"), - KafkaTopic = "test-topic-one-partition", - KafkaTopic = "test-topic-one-partition", - Conf = config(#{ - "authentication" => #{ - "mechanism" => "scram_sha_256", - "username" => "emqxuser", - "password" => "password" - }, - "kafka_hosts_string" => kafka_hosts_string_sasl(), - "kafka_topic" => KafkaTopic, - "instance_id" => InstId - }), - do_publish(Conf, KafkaTopic, InstId). - -t_publish_sasl_scram512(_CtConfig) -> - InstId = emqx_bridge_resource:resource_id("kafka", "SASLScram512Inst"), - KafkaTopic = "test-topic-one-partition", - Conf = config(#{ - "authentication" => #{ - "mechanism" => "scram_sha_512", - "username" => "emqxuser", - "password" => "password" - }, - "kafka_hosts_string" => kafka_hosts_string_sasl(), - "kafka_topic" => KafkaTopic, - "instance_id" => InstId - }), - do_publish(Conf, KafkaTopic, InstId). - -t_publish_sasl_kerberos(_CtConfig) -> - InstId = emqx_bridge_resource:resource_id("kafka", "SASLKerberosInst"), - KafkaTopic = "test-topic-one-partition", - Conf = config(#{ - "authentication" => #{ - "kerberos_principal" => "rig@KDC.EMQX.NET", - "kerberos_keytab_file" => "/var/lib/secret/rig.key" - }, - "kafka_hosts_string" => kafka_hosts_string_sasl(), - "kafka_topic" => KafkaTopic, - "instance_id" => InstId - }), - do_publish(Conf, KafkaTopic, InstId). - config(Args) -> - {ok, Conf} = hocon:binary(hocon_config(Args)), + ConfText = hocon_config(Args), + ct:pal("Running tests with conf:\n~s", [ConfText]), + {ok, Conf} = hocon:binary(ConfText), #{config := Parsed} = hocon_tconf:check_plain( emqx_ee_bridge_kafka, #{<<"config">> => Conf}, @@ -132,9 +131,15 @@ hocon_config(Args) -> AuthConf = maps:get("authentication", Args), AuthTemplate = iolist_to_binary(hocon_config_template_authentication(AuthConf)), AuthConfRendered = bbmustache:render(AuthTemplate, AuthConf), + SSLConf = maps:get("ssl", Args, #{}), + SSLTemplate = iolist_to_binary(hocon_config_template_ssl(SSLConf)), + SSLConfRendered = bbmustache:render(SSLTemplate, SSLConf), Hocon = bbmustache:render( iolist_to_binary(hocon_config_template()), - Args#{"authentication" => AuthConfRendered} + Args#{ + "authentication" => AuthConfRendered, + "ssl" => SSLConfRendered + } ), Hocon. @@ -144,6 +149,7 @@ hocon_config_template() -> bootstrap_hosts = \"{{ kafka_hosts_string }}\" enable = true authentication = {{{ authentication }}} +ssl = {{{ ssl }}} producer = { mqtt { topic = \"t/#\" @@ -173,12 +179,65 @@ hocon_config_template_authentication(#{"kerberos_principal" := _}) -> } """. +%% erlfmt-ignore +hocon_config_template_ssl(Map) when map_size(Map) =:= 0 -> +""" +{ + enable = false +} +"""; +hocon_config_template_ssl(_) -> +""" +{ + enable = true + cacertfile = \"{{{cacertfile}}}\" + certfile = \"{{{certfile}}}\" + keyfile = \"{{{keyfile}}}\" +} +""". + kafka_hosts_string() -> "kafka-1.emqx.net:9092,". kafka_hosts_string_sasl() -> "kafka-1.emqx.net:9093,". +kafka_hosts_string_ssl() -> + "kafka-1.emqx.net:9094,". + +kafka_hosts_string_ssl_sasl() -> + "kafka-1.emqx.net:9095,". + +valid_ssl_settings() -> + #{ + "cacertfile" => <<"/var/lib/secret/ca.crt">>, + "certfile" => <<"/var/lib/secret/client.crt">>, + "keyfile" => <<"/var/lib/secret/client.key">> + }. + +valid_sasl_plain_settings() -> + #{ + "mechanism" => "plain", + "username" => "emqxuser", + "password" => "password" + }. + +valid_sasl_scram256_settings() -> + (valid_sasl_plain_settings())#{ + "mechanism" => "scram_sha_256" + }. + +valid_sasl_scram512_settings() -> + (valid_sasl_plain_settings())#{ + "mechanism" => "scram_sha_512" + }. + +valid_sasl_kerberos_settings() -> + #{ + "kerberos_principal" => "rig@KDC.EMQX.NET", + "kerberos_keytab_file" => "/var/lib/secret/rig.keytab" + }. + kafka_hosts() -> kpro:parse_endpoints(kafka_hosts_string()).