From 91368a57ff12500309284a545807e124ac30e534 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 18 Jan 2024 08:56:39 +0800 Subject: [PATCH] test: add es docker CI test --- .ci/docker-compose-file/.env | 9 + .../docker-compose-elastic-search-tls.yaml | 8 + .ci/docker-compose-file/elastic/ca/ca.crt | 20 + .ci/docker-compose-file/elastic/ca/ca.key | 27 ++ .ci/docker-compose-file/elastic/es01/es01.crt | 20 + .ci/docker-compose-file/elastic/es01/es01.key | 27 ++ .ci/docker-compose-file/elastic/instances.yml | 7 + .ci/docker-compose-file/toxiproxy.json | 6 + apps/emqx_bridge_es/docker-ct | 1 + apps/emqx_bridge_es/src/emqx_bridge_es.erl | 8 +- .../test/emqx_bridge_es_SUITE.erl | 372 ++++++++++++++++++ .../test/emqx_bridge_es_SUITE_data/es.crt | 20 + apps/emqx_connector/src/emqx_connector.erl | 3 + rel/i18n/emqx_bridge_es.hocon | 2 +- scripts/ct/run.sh | 3 + 15 files changed, 528 insertions(+), 5 deletions(-) create mode 100644 .ci/docker-compose-file/elastic/ca/ca.crt create mode 100644 .ci/docker-compose-file/elastic/ca/ca.key create mode 100644 .ci/docker-compose-file/elastic/es01/es01.crt create mode 100644 .ci/docker-compose-file/elastic/es01/es01.key create mode 100644 .ci/docker-compose-file/elastic/instances.yml create mode 100644 apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl create mode 100644 apps/emqx_bridge_es/test/emqx_bridge_es_SUITE_data/es.crt diff --git a/.ci/docker-compose-file/.env b/.ci/docker-compose-file/.env index 3be2b7415..73ec47d00 100644 --- a/.ci/docker-compose-file/.env +++ b/.ci/docker-compose-file/.env @@ -16,4 +16,13 @@ HSTREAMDB_ZK_TAG=3.8.1 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server SQLSERVER_TAG=2019-CU19-ubuntu-20.04 + +# Password for the 'elastic' user (at least 6 characters) +ELASTIC_PASSWORD="emqx123" +# Password for the 'kibana_system' user (at least 6 characters) +KIBANA_PASSWORD="emqx123" +# Version of Elastic products +ELASTIC_TAG=8.11.4 +LICENSE=basic + TARGET=emqx/emqx diff --git a/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml b/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml index fef93d785..50491a88a 100644 --- a/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml +++ b/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml @@ -36,6 +36,8 @@ services: setup: condition: service_healthy image: public.ecr.aws/elastic/elasticsearch:${ELASTIC_TAG} + container_name: elasticsearch + hostname: elasticsearch volumes: - ./elastic:/usr/share/elasticsearch/config/certs - esdata01:/usr/share/elasticsearch/data @@ -66,6 +68,9 @@ services: interval: 10s timeout: 10s retries: 120 + restart: always + networks: + - emqx_bridge kibana: depends_on: @@ -93,6 +98,9 @@ services: interval: 10s timeout: 10s retries: 120 + restart: always + networks: + - emqx_bridge volumes: esdata01: diff --git a/.ci/docker-compose-file/elastic/ca/ca.crt b/.ci/docker-compose-file/elastic/ca/ca.crt new file mode 100644 index 000000000..2d47d555c --- /dev/null +++ b/.ci/docker-compose-file/elastic/ca/ca.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDSjCCAjKgAwIBAgIVAIrN275DCtGnotTPpxwvQ5751N4OMA0GCSqGSIb3DQEB +CwUAMDQxMjAwBgNVBAMTKUVsYXN0aWMgQ2VydGlmaWNhdGUgVG9vbCBBdXRvZ2Vu +ZXJhdGVkIENBMB4XDTI0MDExNjAyMzIyMFoXDTI3MDExNTAyMzIyMFowNDEyMDAG +A1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5lcmF0ZWQgQ0Ew +ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCy0nwiEurUkIPFMLV1weVM +pPk/AlwZUzqjkeL44gsY53XI9Q05w/sL9u6PzwrXgTCFWNXzI9+MoAtp8phPkn14 +cmg5/3sLe9YcFVFjYK/MoljlUbPDj+4dgk8l+w5FRSi0+JN5krUm7rYk9lojAkeS +fX8RU7ekKGbjBXIFtPxX5GNadu9RidR5GkHM3XroAIoris8bFOzMgFn9iybYnkhq +0S+Hpv0A8FVxzle0KNbPpsIkxXH2DnP2iPTDym9xJNl9Iv9MPtj9XaamH7TmXcSt +MbjkAudKsCw4bRuhHonM16DIUr8sX5UcRcAWyJ1x1qpZaOzMdh2VdYAHNuOsZwzJ +AgMBAAGjUzBRMB0GA1UdDgQWBBTAyDlp8NZfPe8NCGVlHJSVclGOhTAfBgNVHSME +GDAWgBTAyDlp8NZfPe8NCGVlHJSVclGOhTAPBgNVHRMBAf8EBTADAQH/MA0GCSqG +SIb3DQEBCwUAA4IBAQAeIUXRKmC53iirY4P49YspLafspAMf4ndMFQAp+Oc223Vs +hQC4axNoYnUdzWDH6LioAN7P826xNPqtXvTZF9fmeX7K8Nm9Kdj+for+QQI3j6+X +zq98VVkACb8b/Mc9Nac/WBbv/1IKyKgNNta7//WNPgAFolOfti/C0NLsPcKhrM9L +mGbvRX8ZjH8pVJ0YTy4/xfDcF7G/Lxl4Yvb0ZXpuQbvE1+Y0h5aoTNshT/skJxC4 +iyVseYr21s3pptKcr6H9KZuSdZe5pbEo+81nT15w+50aswFLk9GCYh5UsQ+1jkRK +cKgxP93i6x8BVbQJGKi1A1jhauSKX2IpWZQsHy4p +-----END CERTIFICATE----- diff --git a/.ci/docker-compose-file/elastic/ca/ca.key b/.ci/docker-compose-file/elastic/ca/ca.key new file mode 100644 index 000000000..72786207a --- /dev/null +++ b/.ci/docker-compose-file/elastic/ca/ca.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAstJ8IhLq1JCDxTC1dcHlTKT5PwJcGVM6o5Hi+OILGOd1yPUN +OcP7C/buj88K14EwhVjV8yPfjKALafKYT5J9eHJoOf97C3vWHBVRY2CvzKJY5VGz +w4/uHYJPJfsORUUotPiTeZK1Ju62JPZaIwJHkn1/EVO3pChm4wVyBbT8V+RjWnbv +UYnUeRpBzN166ACKK4rPGxTszIBZ/Ysm2J5IatEvh6b9APBVcc5XtCjWz6bCJMVx +9g5z9oj0w8pvcSTZfSL/TD7Y/V2mph+05l3ErTG45ALnSrAsOG0boR6JzNegyFK/ +LF+VHEXAFsidcdaqWWjszHYdlXWABzbjrGcMyQIDAQABAoIBAAZOLXYanmjpIRpX +h7h7oikYEplWDRcQBBvvKZaOyuchhznTKTiZmF0xQ3Ny8J4Ndj9ndODWSZxI6uod +FaGNp+qytwnfgDBVGSVDm6tyRfSkX1fTsA/j3/iupvmO/w9yezdZYgLaCVTyex31 +yVMdchZgYjYDUpEBYzJbV2xL18+GBRmmPjdXumlpcJqcclxjOQJSu/1WCGVfn/e/ +64NQpAm7NSKLqeUl32g0/DvUpmYRfmf7ZjVUjePaJQU6sw5/N+3V9F1hYs8VSWz0 +OMzYIfUcvixw+VWx5bu0nWt98FirhsQPjCTThD+DHP6koXGrdXpeMOQE1YZmoV5T +vP0X+FECgYEA5dsKVDQFL67muqz3CNRVM0xDWACCoa8789hYoxvhd1iO3e4kwXBa +ABPcZckioq+HiQ4UIxC2AhQ1FuTeIUTq7LZ0HtAAdKFi48U4LzmPhNUpG1E/HbJ3 +GQbi4u1cAzGYuhdywktgBhn9bJ4XB7+X3815Y9qKkuRcwtXgKGDy8HkCgYEAxyly +vc7NBkLfIAmkOsm6VXfvfBTEUBUGi6+k1rarTUxWFIgRuk4FHywwWUTdxWBKJz3n +HNNJb/g7CcufdhLTuWVHQtJDxYf2cJjoi+Kf7/i/Qs9Nyhokj5Mnh6KlZQOWXpZd +Gwn/O13NeDxt1TIVO2xp6zY4FhVEPvaHuxsMCtECgYA7/eR/P6iO3nZoCJbdXhXy +spftEw0FSCg8p53SzIcXUCzRrcM4HavP0181zb5VebzFP8Bvun/WoRGOLSPwyP0L +1T8Pf7huuGSIEERuxvY3dC8raxQvGxJMnOiA0/Ss/Lfg8hfIsEWashPb0pMuOYpZ +JlblgfejCSlQzOOZhlxB+QKBgQCKmizRLV9/0QAJAsy5YPR9UJdpCebJOKiyg806 +5Ct5AvwRE9UKjAuCczU+mu+f0fApOSpi5CQCeYVUvtG90UJpjrM2LLCfgoyeNbv4 +xgG6dqlcbHrdgK4bATUMbsOd9g4qy4gGLkHi5df9qkhhi5Y9Iajg2X3U2H4DN3yk +WSFbUQKBgQCLz333qWOuT3OBv+EYxHDQUS4YG+dReUos+v0iPJzu+spnfibBF5IC +RjHIhPsdN1byNB0naXOkkz4tUlLGXv6umFgDtQvy/2rxvxQmUGp/WY1VM2+164Xe +NEWdMEU6UckCoMO77kw8JosKhmXCYaSW5bWwnXuEpOj9WWpwjKtxlA== +-----END RSA PRIVATE KEY----- diff --git a/.ci/docker-compose-file/elastic/es01/es01.crt b/.ci/docker-compose-file/elastic/es01/es01.crt new file mode 100644 index 000000000..002f5727b --- /dev/null +++ b/.ci/docker-compose-file/elastic/es01/es01.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDQDCCAiigAwIBAgIUe90yOBN1KBxOEr2jro3epamZksIwDQYJKoZIhvcNAQEL +BQAwNDEyMDAGA1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5l +cmF0ZWQgQ0EwHhcNMjQwMTE2MDIzMjIyWhcNMjcwMTE1MDIzMjIyWjAPMQ0wCwYD +VQQDEwRlczAxMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxGEL71pV +j8qoUxEuL7qjRSeS1eHxeKhu2jqEZb7iA1o/7b/26QuYAkoYL+WuJNfYjg5F/O8W +VVuAYIlN6a/mC6wT2t3pX4YSrdp+i3gtAC/LX+8mAeqMQPD+4jitOwjOsYzbuFCb +nYl86dnFPl/+Pmj20mtZ+Wt7oIPD88j6+r5qgv59pHICxS7Cq304LDTRQbNoT8HO +4c9VGGGtWIdtrqiYrz1OVefkffMrvFt77v6dKHn8g5tSyfQUDCoEKtTOc3Pe5zCB +vIMs6HaapoSkl8XdpFHQ712PCZRebAMCrVcPYQ3r8e9GYmLY/NhxEn3dWTqRhHeg +UD13O8o1aBWonwIDAQABo28wbTAdBgNVHQ4EFgQUXvGJtSf2/mLOK17AzUridtCV +xWwwHwYDVR0jBBgwFoAUwMg5afDWXz3vDQhlZRyUlXJRjoUwIAYDVR0RBBkwF4IJ +bG9jYWxob3N0hwR/AAABggRlczAxMAkGA1UdEwQCMAAwDQYJKoZIhvcNAQELBQAD +ggEBACaNq3ZqrbsGvbEtrf6kJGIsTokTFHeVJUSYmt1ZZzDFLSepXAC/J8gphV45 +B+YSlkDPNTwMYlf7TUYY872zkdqOXN9r0NUx8MzVAX0+rux0RJba5GGUvJGZDNMX +WM5z9ry1KjQSQ1bSoRQOD3QArmBmhvikHjLc97Vqt56N0wA/ztXWOpNZX/TXmast +aXlUbcfQE73Cdq9tW1ATXwbQ2Gf7vVAUT3zjZSZbNdgPuBicGJHf85Fhjm2ND4+R +sjLIOQ2YgVxNHYbueScc6lJM5RNK194K7WrEQnRyGHT3NaDUm0FFNl//aQeq1ZVw +6gaUYlkTFauXwEYMDK901cWFaBE= +-----END CERTIFICATE----- diff --git a/.ci/docker-compose-file/elastic/es01/es01.key b/.ci/docker-compose-file/elastic/es01/es01.key new file mode 100644 index 000000000..b401c5376 --- /dev/null +++ b/.ci/docker-compose-file/elastic/es01/es01.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAxGEL71pVj8qoUxEuL7qjRSeS1eHxeKhu2jqEZb7iA1o/7b/2 +6QuYAkoYL+WuJNfYjg5F/O8WVVuAYIlN6a/mC6wT2t3pX4YSrdp+i3gtAC/LX+8m +AeqMQPD+4jitOwjOsYzbuFCbnYl86dnFPl/+Pmj20mtZ+Wt7oIPD88j6+r5qgv59 +pHICxS7Cq304LDTRQbNoT8HO4c9VGGGtWIdtrqiYrz1OVefkffMrvFt77v6dKHn8 +g5tSyfQUDCoEKtTOc3Pe5zCBvIMs6HaapoSkl8XdpFHQ712PCZRebAMCrVcPYQ3r +8e9GYmLY/NhxEn3dWTqRhHegUD13O8o1aBWonwIDAQABAoIBADJ3A/Om4az5dcce +96EBU9q+IDBBh2Wr1wzSk9p3sqoM47fLqH5b4dzYwJ1yZw2FwFtFFLw6jqExyexE +7JY8gyAFwPZyJ3pKQHuX1gQuRlYxchB9quU8Kn230LA+w1mT2lXrLj2PzWWvAsAv +m837KiFMpP0O5EjB07u8kLsRr1mG6QQ24Kc8oxd7xLXIiPzSvsOpYwo9hmIWENd5 +kyA7oSa9EmN3TRTkKOHI7cFQ3DqIGdO71waUofKOdx39DyHS2YKWxDE/LUjkS9zw +1AyZG09l4uowyLRqwYhivEq9Za6rdc64yheuHatAM9kC2AOcVcsCPZquIe90k4t1 +L7e9CAECgYEA1W483xTW8ngzxv9MMuPiW+PwVGRpyQrbO6OZOxdWEYfhrZlk5wlW +XK2T85jqooJwMWPTk1F49vZ9WN2KuLkL65GlkEtkFbxmOiFJjXuWwycbFSk05hPs +4AESBYHieaSPcwYhvLeG6g4PFyeqmbAGnKsJaj2ylPwDBOc7LgVlqAECgYEA64wo +gZwaj5SlP8M/OqGH04UVYr1kP/Eq6eiDfMyV5exy+pyzofZyNKUfJfw6sGgyRRHx +OVxlnPMsZ8zbdOXsvUEIeavpwDfQcp5eAURL65I6GMLsx2QpfiN2mDe1MqQW0jct +UleFaURgS84KHLE0+tBBg906jOHGjsE7Q3lyUJ8CgYBYYPev4K9JZGD8bEcfY6Ie +Lvsb1yC+8VHrFkmjYHxxcfUPr89KpGEwq2fynUW72YufyBiajkgq69Ln84U4DNhU +ydDnOXDOV191fsc4YQ8C7LSYRKH1DBcwgwD1at1fRbdpCAb8YHrrfLre+bv5PBzg +zyps5fOHIfwWEbI90lpQAQKBgQDoMMqBMTtxi+r1lucOScrVtFuncOCQs5BE8cIj +1JxzAQk6iBv/LSvZP2gcDq5f1Oaw9YXfsHguJfwA+ozeiAQ9bw0Gu3N52sstIXWz +M/rO5d9FJ2k3CEJqqFSwqkGBAQXKBUA06jeF1DREpX+MVxbNo1rhvMOJusn7UPm1 +gtMwKwKBgQCfRzFO10ITwrw8rcRZwO9Axgqf11V7xn6qpgRxj4h0HOErVTCN1H0b +vE3Pz7cxS/g9vFRP37TuqBLfGVzPt9LAEFwCWPeZJLROBLHyu8XrhTbQx+sI2/pe +SBEJAQAHtYasFTE0sBEKNEY2rIt1c29XZhyhhtNKD9gRN/gB355wLg== +-----END RSA PRIVATE KEY----- diff --git a/.ci/docker-compose-file/elastic/instances.yml b/.ci/docker-compose-file/elastic/instances.yml new file mode 100644 index 000000000..bf1ad4102 --- /dev/null +++ b/.ci/docker-compose-file/elastic/instances.yml @@ -0,0 +1,7 @@ +instances: + - name: es01 + dns: + - es01 + - localhost + ip: + - 127.0.0.1 diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index 4b2b6ccf2..c58474039 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -191,5 +191,11 @@ "listen": "0.0.0.0:636", "upstream": "ldap:636", "enabled": true + }, + { + "name": "elasticsearch", + "listen": "0.0.0.0:9200", + "upstream": "elasticsearch:9200", + "enabled": true } ] diff --git a/apps/emqx_bridge_es/docker-ct b/apps/emqx_bridge_es/docker-ct index 80f0d394b..f7cd9ab28 100644 --- a/apps/emqx_bridge_es/docker-ct +++ b/apps/emqx_bridge_es/docker-ct @@ -1 +1,2 @@ toxiproxy +elasticsearch diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl index 9975eb1b1..569b67dba 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -59,7 +59,7 @@ fields(action_create) -> action(create), index(), id(false), - doc(true), + doc(), routing(), require_alias(), overwrite() @@ -72,7 +72,7 @@ fields(action_update) -> action(update), index(), id(true), - doc(true), + doc(), routing(), require_alias() | http_common_opts() @@ -153,12 +153,12 @@ id(Required) -> } )}. -doc(Required) -> +doc() -> {doc, ?HOCON( binary(), #{ - required => Required, + required => false, example => <<"${payload.doc}">>, desc => ?DESC("config_parameters_doc") } diff --git a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl new file mode 100644 index 000000000..e7e2dba28 --- /dev/null +++ b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl @@ -0,0 +1,372 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_es_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(emqx_common_test_helpers, [on_exit/1]). + +-define(TYPE, elasticsearch). +-define(CA, "es.crt"). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + ProxyName = "elasticsearch", + ESHost = os:getenv("ELASTICSEARCH_HOST", "elasticsearch"), + ESPort = list_to_integer(os:getenv("ELASTICSEARCH_PORT", "9200")), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge_es, + emqx_bridge, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _} = emqx_common_test_http:create_default_app(), + wait_until_elasticsearch_is_up(ESHost, ESPort), + [ + {apps, Apps}, + {proxy_name, ProxyName}, + {es_host, ESHost}, + {es_port, ESPort} + | Config + ]. + +es_checks() -> + case os:getenv("IS_CI") of + "yes" -> 10; + _ -> 1 + end. + +wait_until_elasticsearch_is_up(Host, Port) -> + wait_until_elasticsearch_is_up(es_checks(), Host, Port). + +wait_until_elasticsearch_is_up(0, Host, Port) -> + throw({{Host, Port}, not_available}); +wait_until_elasticsearch_is_up(Count, Host, Port) -> + timer:sleep(1000), + case emqx_common_test_helpers:is_all_tcp_servers_available([{Host, Port}]) of + true -> ok; + false -> wait_until_elasticsearch_is_up(Count - 1, Host, Port) + end. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + %ProxyHost = ?config(proxy_host, Config), + %ProxyPort = ?config(proxy_port, Config), + %emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_cth_suite:stop(Apps), + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + %ProxyHost = ?config(proxy_host, Config), + %ProxyPort = ?config(proxy_port, Config), + %emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + emqx_common_test_helpers:call_janitor(60_000), + ok. + +%%------------------------------------------------------------------------------------- +%% Helper fns +%%------------------------------------------------------------------------------------- + +check_send_message_with_action(ActionName, ConnectorName) -> + #{payload := _Payload} = send_message(ActionName), + %% ###################################### + %% Check if message is sent to es + %% ###################################### + check_action_metrics(ActionName, ConnectorName). + +send_message(ActionName) -> + %% ###################################### + %% Create message + %% ###################################### + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Payload = #{<<"name">> => <<"emqx">>, <<"release_time">> => BinTime}, + Index = <<"emqx-test-index">>, + Msg = #{ + clientid => BinTime, + payload => Payload, + timestamp => Time, + index => Index + }, + %% ###################################### + %% Send message + %% ###################################### + emqx_bridge_v2:send_message(?TYPE, ActionName, Msg, #{}), + #{payload => Payload}. + +check_action_metrics(ActionName, ConnectorName) -> + ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName), + Metrics = + #{ + match => emqx_resource_metrics:matched_get(ActionId), + failed => emqx_resource_metrics:failed_get(ActionId), + queuing => emqx_resource_metrics:queuing_get(ActionId), + dropped => emqx_resource_metrics:dropped_get(ActionId) + }, + ?assertEqual( + #{match => 1, dropped => 0, failed => 0, queuing => 0}, + Metrics + ). + +action_config(ConnectorName) -> + action_config(ConnectorName, _Overrides = #{}). + +action_config(ConnectorName, Overrides) -> + Cfg0 = action(ConnectorName), + emqx_utils_maps:deep_merge(Cfg0, Overrides). + +action(ConnectorName) -> + #{ + <<"description">> => <<"My elasticsearch test action">>, + <<"enable">> => true, + <<"parameters">> => #{ + <<"index">> => <<"${payload.index}">>, + <<"action">> => <<"create">>, + <<"doc">> => <<"${payload.doc}">>, + <<"overwrite">> => true + }, + <<"connector">> => ConnectorName, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"30s">>, + <<"query_mode">> => <<"async">> + } + }. + +base_url(Config) -> + Host = ?config(es_host, Config), + Port = ?config(es_port, Config), + iolist_to_binary([ + "https://", + Host, + ":", + integer_to_binary(Port) + ]). + +connector_config(Config) -> + connector_config(_Overrides = #{}, Config). + +connector_config(Overrides, Config) -> + Defaults = + #{ + <<"base_url">> => base_url(Config), + <<"enable">> => true, + <<"authentication">> => #{ + <<"password">> => <<"emqx123">>, + <<"username">> => <<"elastic">> + }, + <<"description">> => <<"My elasticsearch test connector">>, + <<"connect_timeout">> => <<"15s">>, + <<"pool_size">> => 2, + <<"pool_type">> => <<"random">>, + <<"enable_pipelining">> => 100, + <<"ssl">> => #{ + <<"enable">> => true, + <<"hibernate_after">> => <<"5s">>, + <<"cacertfile">> => filename:join(?config(data_dir, Config), ?CA) + } + }, + emqx_utils_maps:deep_merge(Defaults, Overrides). + +create_connector(Name, Config) -> + Res = emqx_connector:create(?TYPE, Name, Config), + on_exit(fun() -> emqx_connector:remove(?TYPE, Name) end), + Res. + +create_action(Name, Config) -> + Res = emqx_bridge_v2:create(?TYPE, Name, Config), + on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end), + Res. + +action_api_spec_props_for_get() -> + #{ + <<"bridge_elasticsearch.get_bridge_v2">> := + #{<<"properties">> := Props} + } = + emqx_bridge_v2_testlib:actions_api_spec_schemas(), + Props. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_create_remove_list(Config) -> + [] = emqx_bridge_v2:list(), + ConnectorConfig = connector_config(Config), + {ok, _} = emqx_connector:create(?TYPE, test_connector, ConnectorConfig), + ActionConfig = action(<<"test_connector">>), + {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_1, ActionConfig), + [ActionInfo] = emqx_bridge_v2:list(), + #{ + name := <<"test_action_1">>, + type := <<"elasticsearch">>, + raw_config := _RawConfig + } = ActionInfo, + {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_2, ActionConfig), + 2 = length(emqx_bridge_v2:list()), + ok = emqx_bridge_v2:remove(?TYPE, test_action_1), + 1 = length(emqx_bridge_v2:list()), + ok = emqx_bridge_v2:remove(?TYPE, test_action_2), + [] = emqx_bridge_v2:list(), + emqx_connector:remove(?TYPE, test_connector), + ok. + +%% Test sending a message to a bridge V2 +t_send_message(Config) -> + ConnectorConfig = connector_config(Config), + {ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig), + ActionConfig = action(<<"test_connector2">>), + {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_1, ActionConfig), + %% Use the action to send a message + check_send_message_with_action(test_action_1, test_connector2), + %% Create a few more bridges with the same connector and test them + BridgeNames1 = [ + list_to_atom("test_bridge_v2_" ++ integer_to_list(I)) + || I <- lists:seq(2, 10) + ], + lists:foreach( + fun(BridgeName) -> + {ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, ActionConfig), + check_send_message_with_action(BridgeName, test_connector2) + end, + BridgeNames1 + ), + BridgeNames = [test_bridge_v2_1 | BridgeNames1], + %% Send more messages to the bridges + lists:foreach( + fun(BridgeName) -> + lists:foreach( + fun(_) -> + check_send_message_with_action(BridgeName, test_connector2) + end, + lists:seq(1, 10) + ) + end, + BridgeNames + ), + %% Remove all the bridges + lists:foreach( + fun(BridgeName) -> + ok = emqx_bridge_v2:remove(?TYPE, BridgeName) + end, + BridgeNames + ), + emqx_connector:remove(?TYPE, test_connector2), + ok. + +%% Test that we can get the status of the bridge V2 +t_health_check(Config) -> + BridgeV2Config = action(<<"test_connector3">>), + ConnectorConfig = connector_config(Config), + {ok, _} = emqx_connector:create(?TYPE, test_connector3, ConnectorConfig), + {ok, _} = emqx_bridge_v2:create(?TYPE, test_bridge_v2, BridgeV2Config), + #{status := connected} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2), + ok = emqx_bridge_v2:remove(?TYPE, test_bridge_v2), + %% Check behaviour when bridge does not exist + {error, bridge_not_found} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2), + ok = emqx_connector:remove(?TYPE, test_connector3), + ok. + +t_bad_url(Config) -> + ConnectorName = <<"test_connector">>, + ActionName = <<"test_action">>, + ActionConfig = action(<<"test_connector">>), + ConnectorConfig0 = connector_config(Config), + ConnectorConfig = ConnectorConfig0#{<<"base_url">> := <<"bad_host:9092">>}, + ?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)), + ?assertMatch({ok, _}, create_action(ActionName, ActionConfig)), + ?assertMatch( + {ok, #{ + resource_data := + #{ + status := ?status_disconnected, + error := failed_to_start_elasticsearch_bridge + } + }}, + emqx_connector:lookup(?TYPE, ConnectorName) + ), + ?assertMatch({ok, #{status := ?status_disconnected}}, emqx_bridge_v2:lookup(?TYPE, ActionName)), + ok. + +t_parameters_key_api_spec(_Config) -> + ActionProps = action_api_spec_props_for_get(), + ?assertNot(is_map_key(<<"elasticsearch">>, ActionProps), #{action_props => ActionProps}), + ?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}), + ok. + +t_http_api_get(Config) -> + ConnectorName = <<"test_connector">>, + ActionName = <<"test_action">>, + ActionConfig = action(ConnectorName), + ConnectorConfig = connector_config(Config), + ?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)), + ?assertMatch({ok, _}, create_action(ActionName, ActionConfig)), + ?assertMatch( + {ok, + {{_, 200, _}, _, [ + #{ + <<"connector">> := ConnectorName, + <<"description">> := <<"My elasticsearch test action">>, + <<"enable">> := true, + <<"error">> := <<>>, + <<"name">> := ActionName, + <<"node_status">> := + [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">>, + <<"status_reason">> := <<>> + } + ], + <<"parameters">> := + #{ + <<"action">> := <<"create">>, + <<"doc">> := <<"${payload.doc}">>, + <<"index">> := <<"${payload.index}">>, + <<"max_retries">> := 2, + <<"overwrite">> := true + }, + <<"resource_opts">> := #{<<"query_mode">> := <<"async">>}, + <<"status">> := <<"connected">>, + <<"status_reason">> := <<>>, + <<"type">> := <<"elasticsearch">> + } + ]}}, + emqx_bridge_v2_testlib:list_bridges_api() + ), + ok. diff --git a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE_data/es.crt b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE_data/es.crt new file mode 100644 index 000000000..2d47d555c --- /dev/null +++ b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE_data/es.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDSjCCAjKgAwIBAgIVAIrN275DCtGnotTPpxwvQ5751N4OMA0GCSqGSIb3DQEB +CwUAMDQxMjAwBgNVBAMTKUVsYXN0aWMgQ2VydGlmaWNhdGUgVG9vbCBBdXRvZ2Vu +ZXJhdGVkIENBMB4XDTI0MDExNjAyMzIyMFoXDTI3MDExNTAyMzIyMFowNDEyMDAG +A1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5lcmF0ZWQgQ0Ew +ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCy0nwiEurUkIPFMLV1weVM +pPk/AlwZUzqjkeL44gsY53XI9Q05w/sL9u6PzwrXgTCFWNXzI9+MoAtp8phPkn14 +cmg5/3sLe9YcFVFjYK/MoljlUbPDj+4dgk8l+w5FRSi0+JN5krUm7rYk9lojAkeS +fX8RU7ekKGbjBXIFtPxX5GNadu9RidR5GkHM3XroAIoris8bFOzMgFn9iybYnkhq +0S+Hpv0A8FVxzle0KNbPpsIkxXH2DnP2iPTDym9xJNl9Iv9MPtj9XaamH7TmXcSt +MbjkAudKsCw4bRuhHonM16DIUr8sX5UcRcAWyJ1x1qpZaOzMdh2VdYAHNuOsZwzJ +AgMBAAGjUzBRMB0GA1UdDgQWBBTAyDlp8NZfPe8NCGVlHJSVclGOhTAfBgNVHSME +GDAWgBTAyDlp8NZfPe8NCGVlHJSVclGOhTAPBgNVHRMBAf8EBTADAQH/MA0GCSqG +SIb3DQEBCwUAA4IBAQAeIUXRKmC53iirY4P49YspLafspAMf4ndMFQAp+Oc223Vs +hQC4axNoYnUdzWDH6LioAN7P826xNPqtXvTZF9fmeX7K8Nm9Kdj+for+QQI3j6+X +zq98VVkACb8b/Mc9Nac/WBbv/1IKyKgNNta7//WNPgAFolOfti/C0NLsPcKhrM9L +mGbvRX8ZjH8pVJ0YTy4/xfDcF7G/Lxl4Yvb0ZXpuQbvE1+Y0h5aoTNshT/skJxC4 +iyVseYr21s3pptKcr6H9KZuSdZe5pbEo+81nT15w+50aswFLk9GCYh5UsQ+1jkRK +cKgxP93i6x8BVbQJGKi1A1jhauSKX2IpWZQsHy4p +-----END CERTIFICATE----- diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 27dfcba2a..92cf9439e 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -150,6 +150,9 @@ post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> ok = emqx_connector_resource:remove(Type, Name), ?tp(connector_post_config_update_done, #{}), ok; + {error, not_found} -> + ?tp(connector_post_config_update_done, #{}), + ok; {ok, Channels} -> {error, {active_channels, Channels}} end; diff --git a/rel/i18n/emqx_bridge_es.hocon b/rel/i18n/emqx_bridge_es.hocon index 62778a712..f5d0f3c02 100644 --- a/rel/i18n/emqx_bridge_es.hocon +++ b/rel/i18n/emqx_bridge_es.hocon @@ -97,7 +97,7 @@ config_parameters_require_alias.label: """_require_alias""" config_parameters_doc.desc: -"""JSON document""" +"""JSON document. If undefined, rule engine will use JSON format to serialize all visible inputs, such as clientid, topic, payload etc.""" config_parameters_doc.label: """doc""" diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 7959581a9..af04fd9ee 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -246,6 +246,9 @@ for dep in ${CT_DEPS}; do otel) FILES+=( '.ci/docker-compose-file/docker-compose-otel.yaml' ) ;; + elasticsearch) + FILES+=( '.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1