From 7f5fe9190565157cb381c75a72e0e4127fa94b8c Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 17 Jan 2024 17:14:28 +0800 Subject: [PATCH 1/4] fix: es's action is atom not binary --- .../docker-compose-elastic-search-tls.yaml | 101 ++++++++++++++++++ apps/emqx_bridge_es/src/emqx_bridge_es.erl | 6 +- .../src/emqx_bridge_es_connector.erl | 5 +- .../src/emqx_bridge_iotdb_connector.erl | 4 +- 4 files changed, 111 insertions(+), 5 deletions(-) create mode 100644 .ci/docker-compose-file/docker-compose-elastic-search-tls.yaml diff --git a/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml b/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml new file mode 100644 index 000000000..fef93d785 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml @@ -0,0 +1,101 @@ +version: "3.9" + +services: + setup: + image: public.ecr.aws/elastic/elasticsearch:${ELASTIC_TAG} + volumes: + - ./elastic:/usr/share/elasticsearch/config/certs + user: "0" + command: > + bash -c ' + if [ x${ELASTIC_PASSWORD} == x ]; then + echo "Set the ELASTIC_PASSWORD environment variable in the .env file"; + exit 1; + elif [ x${KIBANA_PASSWORD} == x ]; then + echo "Set the KIBANA_PASSWORD environment variable in the .env file"; + exit 1; + fi; + echo "Setting file permissions" + chown -R root:root config/certs; + find . -type d -exec chmod 750 \{\} \;; + find . -type f -exec chmod 640 \{\} \;; + echo "Waiting for Elasticsearch availability"; + until curl -s --cacert config/certs/ca/ca.crt https://es01:9200 | grep -q "missing authentication credentials"; do sleep 30; done; + echo "Setting kibana_system password"; + until curl -s -X POST --cacert config/certs/ca/ca.crt -u "elastic:${ELASTIC_PASSWORD}" -H "Content-Type: application/json" https://es01:9200/_security/user/kibana_system/_password -d "{\"password\":\"${KIBANA_PASSWORD}\"}" | grep -q "^{}"; do sleep 10; done; + echo "All done!"; + ' + healthcheck: + test: ["CMD-SHELL", "[ -f config/certs/ca/ca.crt ]"] + interval: 1s + timeout: 5s + retries: 120 + + es01: + depends_on: + setup: + condition: service_healthy + image: public.ecr.aws/elastic/elasticsearch:${ELASTIC_TAG} + volumes: + - ./elastic:/usr/share/elasticsearch/config/certs + - esdata01:/usr/share/elasticsearch/data + ports: + - 9200:9200 + environment: + - node.name=es01 + - ELASTIC_PASSWORD=${ELASTIC_PASSWORD} + - bootstrap.memory_lock=true + - discovery.type=single-node + - xpack.security.enabled=true + - xpack.security.http.ssl.enabled=true + - xpack.security.http.ssl.key=certs/es01/es01.key + - xpack.security.http.ssl.certificate=certs/es01/es01.crt + - xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt + - xpack.license.self_generated.type=${LICENSE} + mem_limit: 1073741824 + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: + [ + "CMD-SHELL", + "curl -s --cacert config/certs/ca/ca.crt https://localhost:9200 | grep -q 'missing authentication credentials'", + ] + interval: 10s + timeout: 10s + retries: 120 + + kibana: + depends_on: + es01: + condition: service_healthy + image: public.ecr.aws/elastic/kibana:${ELASTIC_TAG} + volumes: + - ./elastic:/usr/share/kibana/config/certs + - kibanadata:/usr/share/kibana/data + ports: + - 5601:5601 + environment: + - SERVERNAME=kibana + - ELASTICSEARCH_HOSTS=https://es01:9200 + - ELASTICSEARCH_USERNAME=kibana_system + - ELASTICSEARCH_PASSWORD=${KIBANA_PASSWORD} + - ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES=config/certs/ca/ca.crt + mem_limit: 1073741824 + healthcheck: + test: + [ + "CMD-SHELL", + "curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'", + ] + interval: 10s + timeout: 10s + retries: 120 + +volumes: + esdata01: + driver: local + kibanadata: + driver: local diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl index b575f32ed..9975eb1b1 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -98,10 +98,14 @@ action_union_member_selector({value, Value}) -> [?R_REF(action_delete)]; #{<<"action">> := <<"update">>} -> [?R_REF(action_update)]; - _ -> + #{<<"action">> := Action} when is_atom(Action) -> + Value1 = Value#{<<"action">> => atom_to_binary(Action)}, + action_union_member_selector({value, Value1}); + Actual -> Expected = "create | delete | update", throw(#{ field_name => action, + actual => Actual, expected => Expected }) end. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index fe86eac56..7e49aeb55 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -200,7 +200,7 @@ on_start(InstanceId, Config) -> ?SLOG(info, #{ msg => "elasticsearch_bridge_started", instance_id => InstanceId, - request => maps:get(request, State, <<>>) + request => emqx_utils:redact(maps:get(request, State, <<>>)) }), ?tp(elasticsearch_bridge_started, #{instance_id => InstanceId}), {ok, State#{channels => #{}}}; @@ -208,7 +208,7 @@ on_start(InstanceId, Config) -> ?SLOG(error, #{ msg => "failed_to_start_elasticsearch_bridge", instance_id => InstanceId, - request => maps:get(request, Config, <<>>), + request => emqx_utils:redact(maps:get(request, Config, <<>>)), reason => Reason }), throw(failed_to_start_elasticsearch_bridge) @@ -354,6 +354,7 @@ add_query_string(Keys, Param0) -> end. to_str(List) when is_list(List) -> List; +to_str(Bin) when is_binary(Bin) -> binary_to_list(Bin); to_str(false) -> "false"; to_str(true) -> "true"; to_str(Atom) when is_atom(Atom) -> atom_to_list(Atom). diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index d7b45c18d..ccf97f143 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -212,7 +212,7 @@ on_start(InstanceId, #{iotdb_version := Version} = Config) -> ?SLOG(info, #{ msg => "iotdb_bridge_started", instance_id => InstanceId, - request => maps:get(request, State, <<>>) + request => emqx_utils:redact(maps:get(request, State, <<>>)) }), ?tp(iotdb_bridge_started, #{instance_id => InstanceId}), {ok, State#{iotdb_version => Version, channels => #{}}}; @@ -220,7 +220,7 @@ on_start(InstanceId, #{iotdb_version := Version} = Config) -> ?SLOG(error, #{ msg => "failed_to_start_iotdb_bridge", instance_id => InstanceId, - request => maps:get(request, Config, <<>>), + request => emqx_utils:redact(maps:get(request, Config, <<>>)), reason => Reason }), throw(failed_to_start_iotdb_bridge) From 91368a57ff12500309284a545807e124ac30e534 Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 18 Jan 2024 08:56:39 +0800 Subject: [PATCH 2/4] test: add es docker CI test --- .ci/docker-compose-file/.env | 9 + .../docker-compose-elastic-search-tls.yaml | 8 + .ci/docker-compose-file/elastic/ca/ca.crt | 20 + .ci/docker-compose-file/elastic/ca/ca.key | 27 ++ .ci/docker-compose-file/elastic/es01/es01.crt | 20 + .ci/docker-compose-file/elastic/es01/es01.key | 27 ++ .ci/docker-compose-file/elastic/instances.yml | 7 + .ci/docker-compose-file/toxiproxy.json | 6 + apps/emqx_bridge_es/docker-ct | 1 + apps/emqx_bridge_es/src/emqx_bridge_es.erl | 8 +- .../test/emqx_bridge_es_SUITE.erl | 372 ++++++++++++++++++ .../test/emqx_bridge_es_SUITE_data/es.crt | 20 + apps/emqx_connector/src/emqx_connector.erl | 3 + rel/i18n/emqx_bridge_es.hocon | 2 +- scripts/ct/run.sh | 3 + 15 files changed, 528 insertions(+), 5 deletions(-) create mode 100644 .ci/docker-compose-file/elastic/ca/ca.crt create mode 100644 .ci/docker-compose-file/elastic/ca/ca.key create mode 100644 .ci/docker-compose-file/elastic/es01/es01.crt create mode 100644 .ci/docker-compose-file/elastic/es01/es01.key create mode 100644 .ci/docker-compose-file/elastic/instances.yml create mode 100644 apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl create mode 100644 apps/emqx_bridge_es/test/emqx_bridge_es_SUITE_data/es.crt diff --git a/.ci/docker-compose-file/.env b/.ci/docker-compose-file/.env index 3be2b7415..73ec47d00 100644 --- a/.ci/docker-compose-file/.env +++ b/.ci/docker-compose-file/.env @@ -16,4 +16,13 @@ HSTREAMDB_ZK_TAG=3.8.1 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server SQLSERVER_TAG=2019-CU19-ubuntu-20.04 + +# Password for the 'elastic' user (at least 6 characters) +ELASTIC_PASSWORD="emqx123" +# Password for the 'kibana_system' user (at least 6 characters) +KIBANA_PASSWORD="emqx123" +# Version of Elastic products +ELASTIC_TAG=8.11.4 +LICENSE=basic + TARGET=emqx/emqx diff --git a/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml b/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml index fef93d785..50491a88a 100644 --- a/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml +++ b/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml @@ -36,6 +36,8 @@ services: setup: condition: service_healthy image: public.ecr.aws/elastic/elasticsearch:${ELASTIC_TAG} + container_name: elasticsearch + hostname: elasticsearch volumes: - ./elastic:/usr/share/elasticsearch/config/certs - esdata01:/usr/share/elasticsearch/data @@ -66,6 +68,9 @@ services: interval: 10s timeout: 10s retries: 120 + restart: always + networks: + - emqx_bridge kibana: depends_on: @@ -93,6 +98,9 @@ services: interval: 10s timeout: 10s retries: 120 + restart: always + networks: + - emqx_bridge volumes: esdata01: diff --git a/.ci/docker-compose-file/elastic/ca/ca.crt b/.ci/docker-compose-file/elastic/ca/ca.crt new file mode 100644 index 000000000..2d47d555c --- /dev/null +++ b/.ci/docker-compose-file/elastic/ca/ca.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDSjCCAjKgAwIBAgIVAIrN275DCtGnotTPpxwvQ5751N4OMA0GCSqGSIb3DQEB +CwUAMDQxMjAwBgNVBAMTKUVsYXN0aWMgQ2VydGlmaWNhdGUgVG9vbCBBdXRvZ2Vu +ZXJhdGVkIENBMB4XDTI0MDExNjAyMzIyMFoXDTI3MDExNTAyMzIyMFowNDEyMDAG +A1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5lcmF0ZWQgQ0Ew +ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCy0nwiEurUkIPFMLV1weVM +pPk/AlwZUzqjkeL44gsY53XI9Q05w/sL9u6PzwrXgTCFWNXzI9+MoAtp8phPkn14 +cmg5/3sLe9YcFVFjYK/MoljlUbPDj+4dgk8l+w5FRSi0+JN5krUm7rYk9lojAkeS +fX8RU7ekKGbjBXIFtPxX5GNadu9RidR5GkHM3XroAIoris8bFOzMgFn9iybYnkhq +0S+Hpv0A8FVxzle0KNbPpsIkxXH2DnP2iPTDym9xJNl9Iv9MPtj9XaamH7TmXcSt +MbjkAudKsCw4bRuhHonM16DIUr8sX5UcRcAWyJ1x1qpZaOzMdh2VdYAHNuOsZwzJ +AgMBAAGjUzBRMB0GA1UdDgQWBBTAyDlp8NZfPe8NCGVlHJSVclGOhTAfBgNVHSME +GDAWgBTAyDlp8NZfPe8NCGVlHJSVclGOhTAPBgNVHRMBAf8EBTADAQH/MA0GCSqG +SIb3DQEBCwUAA4IBAQAeIUXRKmC53iirY4P49YspLafspAMf4ndMFQAp+Oc223Vs +hQC4axNoYnUdzWDH6LioAN7P826xNPqtXvTZF9fmeX7K8Nm9Kdj+for+QQI3j6+X +zq98VVkACb8b/Mc9Nac/WBbv/1IKyKgNNta7//WNPgAFolOfti/C0NLsPcKhrM9L +mGbvRX8ZjH8pVJ0YTy4/xfDcF7G/Lxl4Yvb0ZXpuQbvE1+Y0h5aoTNshT/skJxC4 +iyVseYr21s3pptKcr6H9KZuSdZe5pbEo+81nT15w+50aswFLk9GCYh5UsQ+1jkRK +cKgxP93i6x8BVbQJGKi1A1jhauSKX2IpWZQsHy4p +-----END CERTIFICATE----- diff --git a/.ci/docker-compose-file/elastic/ca/ca.key b/.ci/docker-compose-file/elastic/ca/ca.key new file mode 100644 index 000000000..72786207a --- /dev/null +++ b/.ci/docker-compose-file/elastic/ca/ca.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAstJ8IhLq1JCDxTC1dcHlTKT5PwJcGVM6o5Hi+OILGOd1yPUN +OcP7C/buj88K14EwhVjV8yPfjKALafKYT5J9eHJoOf97C3vWHBVRY2CvzKJY5VGz +w4/uHYJPJfsORUUotPiTeZK1Ju62JPZaIwJHkn1/EVO3pChm4wVyBbT8V+RjWnbv +UYnUeRpBzN166ACKK4rPGxTszIBZ/Ysm2J5IatEvh6b9APBVcc5XtCjWz6bCJMVx +9g5z9oj0w8pvcSTZfSL/TD7Y/V2mph+05l3ErTG45ALnSrAsOG0boR6JzNegyFK/ +LF+VHEXAFsidcdaqWWjszHYdlXWABzbjrGcMyQIDAQABAoIBAAZOLXYanmjpIRpX +h7h7oikYEplWDRcQBBvvKZaOyuchhznTKTiZmF0xQ3Ny8J4Ndj9ndODWSZxI6uod +FaGNp+qytwnfgDBVGSVDm6tyRfSkX1fTsA/j3/iupvmO/w9yezdZYgLaCVTyex31 +yVMdchZgYjYDUpEBYzJbV2xL18+GBRmmPjdXumlpcJqcclxjOQJSu/1WCGVfn/e/ +64NQpAm7NSKLqeUl32g0/DvUpmYRfmf7ZjVUjePaJQU6sw5/N+3V9F1hYs8VSWz0 +OMzYIfUcvixw+VWx5bu0nWt98FirhsQPjCTThD+DHP6koXGrdXpeMOQE1YZmoV5T +vP0X+FECgYEA5dsKVDQFL67muqz3CNRVM0xDWACCoa8789hYoxvhd1iO3e4kwXBa +ABPcZckioq+HiQ4UIxC2AhQ1FuTeIUTq7LZ0HtAAdKFi48U4LzmPhNUpG1E/HbJ3 +GQbi4u1cAzGYuhdywktgBhn9bJ4XB7+X3815Y9qKkuRcwtXgKGDy8HkCgYEAxyly +vc7NBkLfIAmkOsm6VXfvfBTEUBUGi6+k1rarTUxWFIgRuk4FHywwWUTdxWBKJz3n +HNNJb/g7CcufdhLTuWVHQtJDxYf2cJjoi+Kf7/i/Qs9Nyhokj5Mnh6KlZQOWXpZd +Gwn/O13NeDxt1TIVO2xp6zY4FhVEPvaHuxsMCtECgYA7/eR/P6iO3nZoCJbdXhXy +spftEw0FSCg8p53SzIcXUCzRrcM4HavP0181zb5VebzFP8Bvun/WoRGOLSPwyP0L +1T8Pf7huuGSIEERuxvY3dC8raxQvGxJMnOiA0/Ss/Lfg8hfIsEWashPb0pMuOYpZ +JlblgfejCSlQzOOZhlxB+QKBgQCKmizRLV9/0QAJAsy5YPR9UJdpCebJOKiyg806 +5Ct5AvwRE9UKjAuCczU+mu+f0fApOSpi5CQCeYVUvtG90UJpjrM2LLCfgoyeNbv4 +xgG6dqlcbHrdgK4bATUMbsOd9g4qy4gGLkHi5df9qkhhi5Y9Iajg2X3U2H4DN3yk +WSFbUQKBgQCLz333qWOuT3OBv+EYxHDQUS4YG+dReUos+v0iPJzu+spnfibBF5IC +RjHIhPsdN1byNB0naXOkkz4tUlLGXv6umFgDtQvy/2rxvxQmUGp/WY1VM2+164Xe +NEWdMEU6UckCoMO77kw8JosKhmXCYaSW5bWwnXuEpOj9WWpwjKtxlA== +-----END RSA PRIVATE KEY----- diff --git a/.ci/docker-compose-file/elastic/es01/es01.crt b/.ci/docker-compose-file/elastic/es01/es01.crt new file mode 100644 index 000000000..002f5727b --- /dev/null +++ b/.ci/docker-compose-file/elastic/es01/es01.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDQDCCAiigAwIBAgIUe90yOBN1KBxOEr2jro3epamZksIwDQYJKoZIhvcNAQEL +BQAwNDEyMDAGA1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5l +cmF0ZWQgQ0EwHhcNMjQwMTE2MDIzMjIyWhcNMjcwMTE1MDIzMjIyWjAPMQ0wCwYD +VQQDEwRlczAxMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxGEL71pV +j8qoUxEuL7qjRSeS1eHxeKhu2jqEZb7iA1o/7b/26QuYAkoYL+WuJNfYjg5F/O8W +VVuAYIlN6a/mC6wT2t3pX4YSrdp+i3gtAC/LX+8mAeqMQPD+4jitOwjOsYzbuFCb +nYl86dnFPl/+Pmj20mtZ+Wt7oIPD88j6+r5qgv59pHICxS7Cq304LDTRQbNoT8HO +4c9VGGGtWIdtrqiYrz1OVefkffMrvFt77v6dKHn8g5tSyfQUDCoEKtTOc3Pe5zCB +vIMs6HaapoSkl8XdpFHQ712PCZRebAMCrVcPYQ3r8e9GYmLY/NhxEn3dWTqRhHeg +UD13O8o1aBWonwIDAQABo28wbTAdBgNVHQ4EFgQUXvGJtSf2/mLOK17AzUridtCV +xWwwHwYDVR0jBBgwFoAUwMg5afDWXz3vDQhlZRyUlXJRjoUwIAYDVR0RBBkwF4IJ +bG9jYWxob3N0hwR/AAABggRlczAxMAkGA1UdEwQCMAAwDQYJKoZIhvcNAQELBQAD +ggEBACaNq3ZqrbsGvbEtrf6kJGIsTokTFHeVJUSYmt1ZZzDFLSepXAC/J8gphV45 +B+YSlkDPNTwMYlf7TUYY872zkdqOXN9r0NUx8MzVAX0+rux0RJba5GGUvJGZDNMX +WM5z9ry1KjQSQ1bSoRQOD3QArmBmhvikHjLc97Vqt56N0wA/ztXWOpNZX/TXmast +aXlUbcfQE73Cdq9tW1ATXwbQ2Gf7vVAUT3zjZSZbNdgPuBicGJHf85Fhjm2ND4+R +sjLIOQ2YgVxNHYbueScc6lJM5RNK194K7WrEQnRyGHT3NaDUm0FFNl//aQeq1ZVw +6gaUYlkTFauXwEYMDK901cWFaBE= +-----END CERTIFICATE----- diff --git a/.ci/docker-compose-file/elastic/es01/es01.key b/.ci/docker-compose-file/elastic/es01/es01.key new file mode 100644 index 000000000..b401c5376 --- /dev/null +++ b/.ci/docker-compose-file/elastic/es01/es01.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAxGEL71pVj8qoUxEuL7qjRSeS1eHxeKhu2jqEZb7iA1o/7b/2 +6QuYAkoYL+WuJNfYjg5F/O8WVVuAYIlN6a/mC6wT2t3pX4YSrdp+i3gtAC/LX+8m +AeqMQPD+4jitOwjOsYzbuFCbnYl86dnFPl/+Pmj20mtZ+Wt7oIPD88j6+r5qgv59 +pHICxS7Cq304LDTRQbNoT8HO4c9VGGGtWIdtrqiYrz1OVefkffMrvFt77v6dKHn8 +g5tSyfQUDCoEKtTOc3Pe5zCBvIMs6HaapoSkl8XdpFHQ712PCZRebAMCrVcPYQ3r +8e9GYmLY/NhxEn3dWTqRhHegUD13O8o1aBWonwIDAQABAoIBADJ3A/Om4az5dcce +96EBU9q+IDBBh2Wr1wzSk9p3sqoM47fLqH5b4dzYwJ1yZw2FwFtFFLw6jqExyexE +7JY8gyAFwPZyJ3pKQHuX1gQuRlYxchB9quU8Kn230LA+w1mT2lXrLj2PzWWvAsAv +m837KiFMpP0O5EjB07u8kLsRr1mG6QQ24Kc8oxd7xLXIiPzSvsOpYwo9hmIWENd5 +kyA7oSa9EmN3TRTkKOHI7cFQ3DqIGdO71waUofKOdx39DyHS2YKWxDE/LUjkS9zw +1AyZG09l4uowyLRqwYhivEq9Za6rdc64yheuHatAM9kC2AOcVcsCPZquIe90k4t1 +L7e9CAECgYEA1W483xTW8ngzxv9MMuPiW+PwVGRpyQrbO6OZOxdWEYfhrZlk5wlW +XK2T85jqooJwMWPTk1F49vZ9WN2KuLkL65GlkEtkFbxmOiFJjXuWwycbFSk05hPs +4AESBYHieaSPcwYhvLeG6g4PFyeqmbAGnKsJaj2ylPwDBOc7LgVlqAECgYEA64wo +gZwaj5SlP8M/OqGH04UVYr1kP/Eq6eiDfMyV5exy+pyzofZyNKUfJfw6sGgyRRHx +OVxlnPMsZ8zbdOXsvUEIeavpwDfQcp5eAURL65I6GMLsx2QpfiN2mDe1MqQW0jct +UleFaURgS84KHLE0+tBBg906jOHGjsE7Q3lyUJ8CgYBYYPev4K9JZGD8bEcfY6Ie +Lvsb1yC+8VHrFkmjYHxxcfUPr89KpGEwq2fynUW72YufyBiajkgq69Ln84U4DNhU +ydDnOXDOV191fsc4YQ8C7LSYRKH1DBcwgwD1at1fRbdpCAb8YHrrfLre+bv5PBzg +zyps5fOHIfwWEbI90lpQAQKBgQDoMMqBMTtxi+r1lucOScrVtFuncOCQs5BE8cIj +1JxzAQk6iBv/LSvZP2gcDq5f1Oaw9YXfsHguJfwA+ozeiAQ9bw0Gu3N52sstIXWz +M/rO5d9FJ2k3CEJqqFSwqkGBAQXKBUA06jeF1DREpX+MVxbNo1rhvMOJusn7UPm1 +gtMwKwKBgQCfRzFO10ITwrw8rcRZwO9Axgqf11V7xn6qpgRxj4h0HOErVTCN1H0b +vE3Pz7cxS/g9vFRP37TuqBLfGVzPt9LAEFwCWPeZJLROBLHyu8XrhTbQx+sI2/pe +SBEJAQAHtYasFTE0sBEKNEY2rIt1c29XZhyhhtNKD9gRN/gB355wLg== +-----END RSA PRIVATE KEY----- diff --git a/.ci/docker-compose-file/elastic/instances.yml b/.ci/docker-compose-file/elastic/instances.yml new file mode 100644 index 000000000..bf1ad4102 --- /dev/null +++ b/.ci/docker-compose-file/elastic/instances.yml @@ -0,0 +1,7 @@ +instances: + - name: es01 + dns: + - es01 + - localhost + ip: + - 127.0.0.1 diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index 4b2b6ccf2..c58474039 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -191,5 +191,11 @@ "listen": "0.0.0.0:636", "upstream": "ldap:636", "enabled": true + }, + { + "name": "elasticsearch", + "listen": "0.0.0.0:9200", + "upstream": "elasticsearch:9200", + "enabled": true } ] diff --git a/apps/emqx_bridge_es/docker-ct b/apps/emqx_bridge_es/docker-ct index 80f0d394b..f7cd9ab28 100644 --- a/apps/emqx_bridge_es/docker-ct +++ b/apps/emqx_bridge_es/docker-ct @@ -1 +1,2 @@ toxiproxy +elasticsearch diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl index 9975eb1b1..569b67dba 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -59,7 +59,7 @@ fields(action_create) -> action(create), index(), id(false), - doc(true), + doc(), routing(), require_alias(), overwrite() @@ -72,7 +72,7 @@ fields(action_update) -> action(update), index(), id(true), - doc(true), + doc(), routing(), require_alias() | http_common_opts() @@ -153,12 +153,12 @@ id(Required) -> } )}. -doc(Required) -> +doc() -> {doc, ?HOCON( binary(), #{ - required => Required, + required => false, example => <<"${payload.doc}">>, desc => ?DESC("config_parameters_doc") } diff --git a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl new file mode 100644 index 000000000..e7e2dba28 --- /dev/null +++ b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl @@ -0,0 +1,372 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_es_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(emqx_common_test_helpers, [on_exit/1]). + +-define(TYPE, elasticsearch). +-define(CA, "es.crt"). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + ProxyName = "elasticsearch", + ESHost = os:getenv("ELASTICSEARCH_HOST", "elasticsearch"), + ESPort = list_to_integer(os:getenv("ELASTICSEARCH_PORT", "9200")), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge_es, + emqx_bridge, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _} = emqx_common_test_http:create_default_app(), + wait_until_elasticsearch_is_up(ESHost, ESPort), + [ + {apps, Apps}, + {proxy_name, ProxyName}, + {es_host, ESHost}, + {es_port, ESPort} + | Config + ]. + +es_checks() -> + case os:getenv("IS_CI") of + "yes" -> 10; + _ -> 1 + end. + +wait_until_elasticsearch_is_up(Host, Port) -> + wait_until_elasticsearch_is_up(es_checks(), Host, Port). + +wait_until_elasticsearch_is_up(0, Host, Port) -> + throw({{Host, Port}, not_available}); +wait_until_elasticsearch_is_up(Count, Host, Port) -> + timer:sleep(1000), + case emqx_common_test_helpers:is_all_tcp_servers_available([{Host, Port}]) of + true -> ok; + false -> wait_until_elasticsearch_is_up(Count - 1, Host, Port) + end. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + %ProxyHost = ?config(proxy_host, Config), + %ProxyPort = ?config(proxy_port, Config), + %emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_cth_suite:stop(Apps), + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + %ProxyHost = ?config(proxy_host, Config), + %ProxyPort = ?config(proxy_port, Config), + %emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + emqx_common_test_helpers:call_janitor(60_000), + ok. + +%%------------------------------------------------------------------------------------- +%% Helper fns +%%------------------------------------------------------------------------------------- + +check_send_message_with_action(ActionName, ConnectorName) -> + #{payload := _Payload} = send_message(ActionName), + %% ###################################### + %% Check if message is sent to es + %% ###################################### + check_action_metrics(ActionName, ConnectorName). + +send_message(ActionName) -> + %% ###################################### + %% Create message + %% ###################################### + Time = erlang:unique_integer(), + BinTime = integer_to_binary(Time), + Payload = #{<<"name">> => <<"emqx">>, <<"release_time">> => BinTime}, + Index = <<"emqx-test-index">>, + Msg = #{ + clientid => BinTime, + payload => Payload, + timestamp => Time, + index => Index + }, + %% ###################################### + %% Send message + %% ###################################### + emqx_bridge_v2:send_message(?TYPE, ActionName, Msg, #{}), + #{payload => Payload}. + +check_action_metrics(ActionName, ConnectorName) -> + ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName), + Metrics = + #{ + match => emqx_resource_metrics:matched_get(ActionId), + failed => emqx_resource_metrics:failed_get(ActionId), + queuing => emqx_resource_metrics:queuing_get(ActionId), + dropped => emqx_resource_metrics:dropped_get(ActionId) + }, + ?assertEqual( + #{match => 1, dropped => 0, failed => 0, queuing => 0}, + Metrics + ). + +action_config(ConnectorName) -> + action_config(ConnectorName, _Overrides = #{}). + +action_config(ConnectorName, Overrides) -> + Cfg0 = action(ConnectorName), + emqx_utils_maps:deep_merge(Cfg0, Overrides). + +action(ConnectorName) -> + #{ + <<"description">> => <<"My elasticsearch test action">>, + <<"enable">> => true, + <<"parameters">> => #{ + <<"index">> => <<"${payload.index}">>, + <<"action">> => <<"create">>, + <<"doc">> => <<"${payload.doc}">>, + <<"overwrite">> => true + }, + <<"connector">> => ConnectorName, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"30s">>, + <<"query_mode">> => <<"async">> + } + }. + +base_url(Config) -> + Host = ?config(es_host, Config), + Port = ?config(es_port, Config), + iolist_to_binary([ + "https://", + Host, + ":", + integer_to_binary(Port) + ]). + +connector_config(Config) -> + connector_config(_Overrides = #{}, Config). + +connector_config(Overrides, Config) -> + Defaults = + #{ + <<"base_url">> => base_url(Config), + <<"enable">> => true, + <<"authentication">> => #{ + <<"password">> => <<"emqx123">>, + <<"username">> => <<"elastic">> + }, + <<"description">> => <<"My elasticsearch test connector">>, + <<"connect_timeout">> => <<"15s">>, + <<"pool_size">> => 2, + <<"pool_type">> => <<"random">>, + <<"enable_pipelining">> => 100, + <<"ssl">> => #{ + <<"enable">> => true, + <<"hibernate_after">> => <<"5s">>, + <<"cacertfile">> => filename:join(?config(data_dir, Config), ?CA) + } + }, + emqx_utils_maps:deep_merge(Defaults, Overrides). + +create_connector(Name, Config) -> + Res = emqx_connector:create(?TYPE, Name, Config), + on_exit(fun() -> emqx_connector:remove(?TYPE, Name) end), + Res. + +create_action(Name, Config) -> + Res = emqx_bridge_v2:create(?TYPE, Name, Config), + on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end), + Res. + +action_api_spec_props_for_get() -> + #{ + <<"bridge_elasticsearch.get_bridge_v2">> := + #{<<"properties">> := Props} + } = + emqx_bridge_v2_testlib:actions_api_spec_schemas(), + Props. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_create_remove_list(Config) -> + [] = emqx_bridge_v2:list(), + ConnectorConfig = connector_config(Config), + {ok, _} = emqx_connector:create(?TYPE, test_connector, ConnectorConfig), + ActionConfig = action(<<"test_connector">>), + {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_1, ActionConfig), + [ActionInfo] = emqx_bridge_v2:list(), + #{ + name := <<"test_action_1">>, + type := <<"elasticsearch">>, + raw_config := _RawConfig + } = ActionInfo, + {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_2, ActionConfig), + 2 = length(emqx_bridge_v2:list()), + ok = emqx_bridge_v2:remove(?TYPE, test_action_1), + 1 = length(emqx_bridge_v2:list()), + ok = emqx_bridge_v2:remove(?TYPE, test_action_2), + [] = emqx_bridge_v2:list(), + emqx_connector:remove(?TYPE, test_connector), + ok. + +%% Test sending a message to a bridge V2 +t_send_message(Config) -> + ConnectorConfig = connector_config(Config), + {ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig), + ActionConfig = action(<<"test_connector2">>), + {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_1, ActionConfig), + %% Use the action to send a message + check_send_message_with_action(test_action_1, test_connector2), + %% Create a few more bridges with the same connector and test them + BridgeNames1 = [ + list_to_atom("test_bridge_v2_" ++ integer_to_list(I)) + || I <- lists:seq(2, 10) + ], + lists:foreach( + fun(BridgeName) -> + {ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, ActionConfig), + check_send_message_with_action(BridgeName, test_connector2) + end, + BridgeNames1 + ), + BridgeNames = [test_bridge_v2_1 | BridgeNames1], + %% Send more messages to the bridges + lists:foreach( + fun(BridgeName) -> + lists:foreach( + fun(_) -> + check_send_message_with_action(BridgeName, test_connector2) + end, + lists:seq(1, 10) + ) + end, + BridgeNames + ), + %% Remove all the bridges + lists:foreach( + fun(BridgeName) -> + ok = emqx_bridge_v2:remove(?TYPE, BridgeName) + end, + BridgeNames + ), + emqx_connector:remove(?TYPE, test_connector2), + ok. + +%% Test that we can get the status of the bridge V2 +t_health_check(Config) -> + BridgeV2Config = action(<<"test_connector3">>), + ConnectorConfig = connector_config(Config), + {ok, _} = emqx_connector:create(?TYPE, test_connector3, ConnectorConfig), + {ok, _} = emqx_bridge_v2:create(?TYPE, test_bridge_v2, BridgeV2Config), + #{status := connected} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2), + ok = emqx_bridge_v2:remove(?TYPE, test_bridge_v2), + %% Check behaviour when bridge does not exist + {error, bridge_not_found} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2), + ok = emqx_connector:remove(?TYPE, test_connector3), + ok. + +t_bad_url(Config) -> + ConnectorName = <<"test_connector">>, + ActionName = <<"test_action">>, + ActionConfig = action(<<"test_connector">>), + ConnectorConfig0 = connector_config(Config), + ConnectorConfig = ConnectorConfig0#{<<"base_url">> := <<"bad_host:9092">>}, + ?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)), + ?assertMatch({ok, _}, create_action(ActionName, ActionConfig)), + ?assertMatch( + {ok, #{ + resource_data := + #{ + status := ?status_disconnected, + error := failed_to_start_elasticsearch_bridge + } + }}, + emqx_connector:lookup(?TYPE, ConnectorName) + ), + ?assertMatch({ok, #{status := ?status_disconnected}}, emqx_bridge_v2:lookup(?TYPE, ActionName)), + ok. + +t_parameters_key_api_spec(_Config) -> + ActionProps = action_api_spec_props_for_get(), + ?assertNot(is_map_key(<<"elasticsearch">>, ActionProps), #{action_props => ActionProps}), + ?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}), + ok. + +t_http_api_get(Config) -> + ConnectorName = <<"test_connector">>, + ActionName = <<"test_action">>, + ActionConfig = action(ConnectorName), + ConnectorConfig = connector_config(Config), + ?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)), + ?assertMatch({ok, _}, create_action(ActionName, ActionConfig)), + ?assertMatch( + {ok, + {{_, 200, _}, _, [ + #{ + <<"connector">> := ConnectorName, + <<"description">> := <<"My elasticsearch test action">>, + <<"enable">> := true, + <<"error">> := <<>>, + <<"name">> := ActionName, + <<"node_status">> := + [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">>, + <<"status_reason">> := <<>> + } + ], + <<"parameters">> := + #{ + <<"action">> := <<"create">>, + <<"doc">> := <<"${payload.doc}">>, + <<"index">> := <<"${payload.index}">>, + <<"max_retries">> := 2, + <<"overwrite">> := true + }, + <<"resource_opts">> := #{<<"query_mode">> := <<"async">>}, + <<"status">> := <<"connected">>, + <<"status_reason">> := <<>>, + <<"type">> := <<"elasticsearch">> + } + ]}}, + emqx_bridge_v2_testlib:list_bridges_api() + ), + ok. diff --git a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE_data/es.crt b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE_data/es.crt new file mode 100644 index 000000000..2d47d555c --- /dev/null +++ b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE_data/es.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDSjCCAjKgAwIBAgIVAIrN275DCtGnotTPpxwvQ5751N4OMA0GCSqGSIb3DQEB +CwUAMDQxMjAwBgNVBAMTKUVsYXN0aWMgQ2VydGlmaWNhdGUgVG9vbCBBdXRvZ2Vu +ZXJhdGVkIENBMB4XDTI0MDExNjAyMzIyMFoXDTI3MDExNTAyMzIyMFowNDEyMDAG +A1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5lcmF0ZWQgQ0Ew +ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCy0nwiEurUkIPFMLV1weVM +pPk/AlwZUzqjkeL44gsY53XI9Q05w/sL9u6PzwrXgTCFWNXzI9+MoAtp8phPkn14 +cmg5/3sLe9YcFVFjYK/MoljlUbPDj+4dgk8l+w5FRSi0+JN5krUm7rYk9lojAkeS +fX8RU7ekKGbjBXIFtPxX5GNadu9RidR5GkHM3XroAIoris8bFOzMgFn9iybYnkhq +0S+Hpv0A8FVxzle0KNbPpsIkxXH2DnP2iPTDym9xJNl9Iv9MPtj9XaamH7TmXcSt +MbjkAudKsCw4bRuhHonM16DIUr8sX5UcRcAWyJ1x1qpZaOzMdh2VdYAHNuOsZwzJ +AgMBAAGjUzBRMB0GA1UdDgQWBBTAyDlp8NZfPe8NCGVlHJSVclGOhTAfBgNVHSME +GDAWgBTAyDlp8NZfPe8NCGVlHJSVclGOhTAPBgNVHRMBAf8EBTADAQH/MA0GCSqG +SIb3DQEBCwUAA4IBAQAeIUXRKmC53iirY4P49YspLafspAMf4ndMFQAp+Oc223Vs +hQC4axNoYnUdzWDH6LioAN7P826xNPqtXvTZF9fmeX7K8Nm9Kdj+for+QQI3j6+X +zq98VVkACb8b/Mc9Nac/WBbv/1IKyKgNNta7//WNPgAFolOfti/C0NLsPcKhrM9L +mGbvRX8ZjH8pVJ0YTy4/xfDcF7G/Lxl4Yvb0ZXpuQbvE1+Y0h5aoTNshT/skJxC4 +iyVseYr21s3pptKcr6H9KZuSdZe5pbEo+81nT15w+50aswFLk9GCYh5UsQ+1jkRK +cKgxP93i6x8BVbQJGKi1A1jhauSKX2IpWZQsHy4p +-----END CERTIFICATE----- diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 27dfcba2a..92cf9439e 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -150,6 +150,9 @@ post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> ok = emqx_connector_resource:remove(Type, Name), ?tp(connector_post_config_update_done, #{}), ok; + {error, not_found} -> + ?tp(connector_post_config_update_done, #{}), + ok; {ok, Channels} -> {error, {active_channels, Channels}} end; diff --git a/rel/i18n/emqx_bridge_es.hocon b/rel/i18n/emqx_bridge_es.hocon index 62778a712..f5d0f3c02 100644 --- a/rel/i18n/emqx_bridge_es.hocon +++ b/rel/i18n/emqx_bridge_es.hocon @@ -97,7 +97,7 @@ config_parameters_require_alias.label: """_require_alias""" config_parameters_doc.desc: -"""JSON document""" +"""JSON document. If undefined, rule engine will use JSON format to serialize all visible inputs, such as clientid, topic, payload etc.""" config_parameters_doc.label: """doc""" diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 7959581a9..af04fd9ee 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -246,6 +246,9 @@ for dep in ${CT_DEPS}; do otel) FILES+=( '.ci/docker-compose-file/docker-compose-otel.yaml' ) ;; + elasticsearch) + FILES+=( '.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1 From 59797cfea7a2f79da3d95e15c1279d8c0a9deafd Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 18 Jan 2024 15:43:02 +0800 Subject: [PATCH 3/4] feat: es's update support doc_as_upsert --- apps/emqx_bridge_es/src/emqx_bridge_es.erl | 12 ++ .../src/emqx_bridge_es_connector.erl | 33 +++++- .../test/emqx_bridge_es_SUITE.erl | 109 ++++++++++-------- .../src/emqx_bridge_http_connector.erl | 42 ++++--- rel/i18n/emqx_bridge_es.hocon | 6 + 5 files changed, 129 insertions(+), 73 deletions(-) diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl index 569b67dba..032439574 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -73,6 +73,7 @@ fields(action_update) -> index(), id(true), doc(), + doc_as_upsert(), routing(), require_alias() | http_common_opts() @@ -172,6 +173,17 @@ http_common_opts() -> emqx_bridge_http_schema:fields("parameters_opts") ). +doc_as_upsert() -> + {doc_as_upsert, + ?HOCON( + boolean(), + #{ + required => false, + default => false, + desc => ?DESC("config_doc_as_upsert") + } + )}. + routing() -> {routing, ?HOCON( diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index 7e49aeb55..8b68af10f 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -33,6 +33,8 @@ connector_example_values/0 ]). +-export([render_template/2]). + %% emqx_connector_resource behaviour callbacks -export([connector_config/2]). @@ -286,8 +288,12 @@ on_add_channel( method => method(Parameter), body => get_body_template(Parameter) }, + ChannelConfig = #{ + parameters => Parameter1, + render_template_func => fun ?MODULE:render_template/2 + }, {ok, State} = emqx_bridge_http_connector:on_add_channel( - InstanceId, State0, ChannelId, #{parameters => Parameter1} + InstanceId, State0, ChannelId, ChannelConfig ), Channel = Parameter1, Channels2 = Channels#{ChannelId => Channel}, @@ -310,9 +316,23 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> {error, not_exists} end. +render_template(Template, Msg) -> + % Ignoring errors here, undefined bindings will be replaced with empty string. + Opts = #{var_trans => fun to_string/2}, + {String, _Errors} = emqx_template:render(Template, {emqx_jsonish, Msg}, Opts), + String. + %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- + +to_string(Name, Value) -> + emqx_template:to_string(render_var(Name, Value)). +render_var(_, undefined) -> + % NOTE Any allowed but undefined binding will be replaced with empty string + <<>>; +render_var(_Name, Value) -> + Value. %% delete DELETE //_doc/<_id> path(#{action := delete, id := Id, index := Index} = Action) -> BasePath = ["/", Index, "/_doc/", Id], @@ -370,5 +390,12 @@ handle_response({ok, Code, Body}) -> handle_response({error, _} = Error) -> Error. -get_body_template(#{doc := Doc}) -> Doc; -get_body_template(_) -> undefined. +get_body_template(#{action := update, doc := Doc} = Template) -> + case maps:get(doc_as_upsert, Template, false) of + false -> <<"{\"doc\":", Doc/binary, "}">>; + true -> <<"{\"doc\":", Doc/binary, ",\"doc_as_upsert\": true}">> + end; +get_body_template(#{doc := Doc}) -> + Doc; +get_body_template(_) -> + undefined. diff --git a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl index e7e2dba28..a9ff70957 100644 --- a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl +++ b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl @@ -103,45 +103,46 @@ end_per_testcase(_TestCase, _Config) -> %% Helper fns %%------------------------------------------------------------------------------------- -check_send_message_with_action(ActionName, ConnectorName) -> - #{payload := _Payload} = send_message(ActionName), +check_send_message_with_action(Topic, ActionName, ConnectorName) -> + send_message(Topic), %% ###################################### %% Check if message is sent to es %% ###################################### + timer:sleep(500), check_action_metrics(ActionName, ConnectorName). -send_message(ActionName) -> - %% ###################################### - %% Create message - %% ###################################### - Time = erlang:unique_integer(), - BinTime = integer_to_binary(Time), - Payload = #{<<"name">> => <<"emqx">>, <<"release_time">> => BinTime}, +send_message(Topic) -> + Now = emqx_utils_calendar:now_to_rfc3339(microsecond), + Doc = #{<<"name">> => <<"emqx">>, <<"release_date">> => Now}, Index = <<"emqx-test-index">>, - Msg = #{ - clientid => BinTime, - payload => Payload, - timestamp => Time, - index => Index - }, - %% ###################################### - %% Send message - %% ###################################### - emqx_bridge_v2:send_message(?TYPE, ActionName, Msg, #{}), - #{payload => Payload}. + Payload = emqx_utils_json:encode(#{doc => Doc, index => Index}), + + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + {ok, Client} = emqtt:start_link([{clientid, ClientId}, {port, 1883}]), + {ok, _} = emqtt:connect(Client), + ok = emqtt:publish(Client, Topic, Payload, [{qos, 0}]), + ok. check_action_metrics(ActionName, ConnectorName) -> ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName), Metrics = #{ match => emqx_resource_metrics:matched_get(ActionId), + success => emqx_resource_metrics:success_get(ActionId), failed => emqx_resource_metrics:failed_get(ActionId), queuing => emqx_resource_metrics:queuing_get(ActionId), dropped => emqx_resource_metrics:dropped_get(ActionId) }, ?assertEqual( - #{match => 1, dropped => 0, failed => 0, queuing => 0}, - Metrics + #{ + match => 1, + success => 1, + dropped => 0, + failed => 0, + queuing => 0 + }, + Metrics, + {ActionName, ConnectorName, ActionId} ). action_config(ConnectorName) -> @@ -164,7 +165,7 @@ action(ConnectorName) -> <<"connector">> => ConnectorName, <<"resource_opts">> => #{ <<"health_check_interval">> => <<"30s">>, - <<"query_mode">> => <<"async">> + <<"query_mode">> => <<"sync">> } }. @@ -235,7 +236,8 @@ t_create_remove_list(Config) -> #{ name := <<"test_action_1">>, type := <<"elasticsearch">>, - raw_config := _RawConfig + raw_config := _, + status := connected } = ActionInfo, {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_2, ActionConfig), 2 = length(emqx_bridge_v2:list()), @@ -252,39 +254,44 @@ t_send_message(Config) -> {ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig), ActionConfig = action(<<"test_connector2">>), {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_1, ActionConfig), + Rule = #{ + id => <<"rule:t_es">>, + sql => <<"SELECT\n *\nFROM\n \"es/#\"">>, + actions => [<<"elasticsearch:test_action_1">>], + description => <<"sink doc to elasticsearch">> + }, + {ok, _} = emqx_rule_engine:create_rule(Rule), %% Use the action to send a message - check_send_message_with_action(test_action_1, test_connector2), + check_send_message_with_action(<<"es/1">>, test_action_1, test_connector2), %% Create a few more bridges with the same connector and test them - BridgeNames1 = [ - list_to_atom("test_bridge_v2_" ++ integer_to_list(I)) - || I <- lists:seq(2, 10) - ], - lists:foreach( - fun(BridgeName) -> - {ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, ActionConfig), - check_send_message_with_action(BridgeName, test_connector2) - end, - BridgeNames1 - ), - BridgeNames = [test_bridge_v2_1 | BridgeNames1], - %% Send more messages to the bridges - lists:foreach( - fun(BridgeName) -> - lists:foreach( - fun(_) -> - check_send_message_with_action(BridgeName, test_connector2) - end, - lists:seq(1, 10) - ) - end, - BridgeNames - ), + ActionNames1 = + lists:foldl( + fun(I, Acc) -> + Seq = integer_to_binary(I), + ActionNameStr = "test_action_" ++ integer_to_list(I), + ActionName = list_to_atom(ActionNameStr), + {ok, _} = emqx_bridge_v2:create(?TYPE, ActionName, ActionConfig), + Rule1 = #{ + id => <<"rule:t_es", Seq/binary>>, + sql => <<"SELECT\n *\nFROM\n \"es/", Seq/binary, "\"">>, + actions => [<<"elasticsearch:", (list_to_binary(ActionNameStr))/binary>>], + description => <<"sink doc to elasticsearch">> + }, + {ok, _} = emqx_rule_engine:create_rule(Rule1), + Topic = <<"es/", Seq/binary>>, + check_send_message_with_action(Topic, ActionName, test_connector2), + [ActionName | Acc] + end, + [], + lists:seq(2, 10) + ), + ActionNames = [test_action_1 | ActionNames1], %% Remove all the bridges lists:foreach( fun(BridgeName) -> ok = emqx_bridge_v2:remove(?TYPE, BridgeName) end, - BridgeNames + ActionNames ), emqx_connector:remove(?TYPE, test_connector2), ok. @@ -361,7 +368,7 @@ t_http_api_get(Config) -> <<"max_retries">> := 2, <<"overwrite">> := true }, - <<"resource_opts">> := #{<<"query_mode">> := <<"async">>}, + <<"resource_opts">> := #{<<"query_mode">> := <<"sync">>}, <<"status">> := <<"connected">>, <<"status_reason">> := <<>>, <<"type">> := <<"elasticsearch">> diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 8f54694e9..81acec602 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -266,7 +266,9 @@ on_add_channel( ) -> InstalledActions = maps:get(installed_actions, OldState, #{}), {ok, ActionState} = do_create_http_action(ActionConfig), - NewInstalledActions = maps:put(ActionId, ActionState, InstalledActions), + RenderTemplate = maps:get(render_template_func, ActionConfig, fun render_template/2), + ActionState1 = ActionState#{render_template_func => RenderTemplate}, + NewInstalledActions = maps:put(ActionId, ActionState1, InstalledActions), NewState = maps:put(installed_actions, NewInstalledActions, OldState), {ok, NewState}. @@ -631,9 +633,10 @@ parse_template(String) -> process_request_and_action(Request, ActionState, Msg) -> MethodTemplate = maps:get(method, ActionState), - Method = make_method(render_template_string(MethodTemplate, Msg)), - PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)), - PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)), + RenderTmplFunc = maps:get(render_template_func, ActionState), + Method = make_method(render_template_string(MethodTemplate, RenderTmplFunc, Msg)), + PathPrefix = unicode:characters_to_list(RenderTmplFunc(maps:get(path, Request), Msg)), + PathSuffix = unicode:characters_to_list(RenderTmplFunc(maps:get(path, ActionState), Msg)), Path = case PathSuffix of @@ -644,11 +647,11 @@ process_request_and_action(Request, ActionState, Msg) -> HeadersTemplate1 = maps:get(headers, Request), HeadersTemplate2 = maps:get(headers, ActionState), Headers = merge_proplist( - render_headers(HeadersTemplate1, Msg), - render_headers(HeadersTemplate2, Msg) + render_headers(HeadersTemplate1, RenderTmplFunc, Msg), + render_headers(HeadersTemplate2, RenderTmplFunc, Msg) ), BodyTemplate = maps:get(body, ActionState), - Body = render_request_body(BodyTemplate, Msg), + Body = render_request_body(BodyTemplate, RenderTmplFunc, Msg), #{ method => Method, path => Path, @@ -681,25 +684,26 @@ process_request( } = Conf, Msg ) -> + RenderTemplateFun = fun render_template/2, Conf#{ - method => make_method(render_template_string(MethodTemplate, Msg)), - path => unicode:characters_to_list(render_template(PathTemplate, Msg)), - body => render_request_body(BodyTemplate, Msg), - headers => render_headers(HeadersTemplate, Msg), + method => make_method(render_template_string(MethodTemplate, RenderTemplateFun, Msg)), + path => unicode:characters_to_list(RenderTemplateFun(PathTemplate, Msg)), + body => render_request_body(BodyTemplate, RenderTemplateFun, Msg), + headers => render_headers(HeadersTemplate, RenderTemplateFun, Msg), request_timeout => ReqTimeout }. -render_request_body(undefined, Msg) -> +render_request_body(undefined, _, Msg) -> emqx_utils_json:encode(Msg); -render_request_body(BodyTks, Msg) -> - render_template(BodyTks, Msg). +render_request_body(BodyTks, RenderTmplFunc, Msg) -> + RenderTmplFunc(BodyTks, Msg). -render_headers(HeaderTks, Msg) -> +render_headers(HeaderTks, RenderTmplFunc, Msg) -> lists:map( fun({K, V}) -> { - render_template_string(K, Msg), - render_template_string(emqx_secret:unwrap(V), Msg) + render_template_string(K, RenderTmplFunc, Msg), + render_template_string(emqx_secret:unwrap(V), RenderTmplFunc, Msg) } end, HeaderTks @@ -710,8 +714,8 @@ render_template(Template, Msg) -> {String, _Errors} = emqx_template:render(Template, {emqx_jsonish, Msg}), String. -render_template_string(Template, Msg) -> - unicode:characters_to_binary(render_template(Template, Msg)). +render_template_string(Template, RenderTmplFunc, Msg) -> + unicode:characters_to_binary(RenderTmplFunc(Template, Msg)). make_method(M) when M == <<"POST">>; M == <<"post">> -> post; make_method(M) when M == <<"PUT">>; M == <<"put">> -> put; diff --git a/rel/i18n/emqx_bridge_es.hocon b/rel/i18n/emqx_bridge_es.hocon index f5d0f3c02..8ad11f05b 100644 --- a/rel/i18n/emqx_bridge_es.hocon +++ b/rel/i18n/emqx_bridge_es.hocon @@ -56,6 +56,12 @@ config_routing.desc: config_routing.label: """Routing""" +config_doc_as_upsert.desc: +"""Instead of sending a partial doc plus an upsert doc, +you can set doc_as_upsert to true to use the contents of doc as the upsert value.""" +config_doc_as_upsert.label: +"""doc_as_upsert""" + config_wait_for_active_shards.desc: """The number of shard copies that must be active before proceeding with the operation. Set to all or any positive integer up to the total number of shards in the index (number_of_replicas+1). From dae835635cacdb7967a6d64a6b2b48a89d629e2c Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Thu, 18 Jan 2024 16:23:02 +0800 Subject: [PATCH 4/4] fix: don't crash in http SUITE --- apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl | 5 +++-- apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl | 2 +- changes/feat-12348.en.md | 1 + scripts/spellcheck/dicts/emqx.txt | 2 ++ 4 files changed, 7 insertions(+), 3 deletions(-) create mode 100644 changes/feat-12348.en.md diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 81acec602..a148a4d16 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -38,6 +38,7 @@ ]). -export([reply_delegator/3]). +-export([render_template/2]). -export([ roots/0, @@ -266,8 +267,8 @@ on_add_channel( ) -> InstalledActions = maps:get(installed_actions, OldState, #{}), {ok, ActionState} = do_create_http_action(ActionConfig), - RenderTemplate = maps:get(render_template_func, ActionConfig, fun render_template/2), - ActionState1 = ActionState#{render_template_func => RenderTemplate}, + RenderTmplFunc = maps:get(render_template_func, ActionConfig, fun ?MODULE:render_template/2), + ActionState1 = ActionState#{render_template_func => RenderTmplFunc}, NewInstalledActions = maps:put(ActionId, ActionState1, InstalledActions), NewState = maps:put(installed_actions, NewInstalledActions, OldState), {ok, NewState}. diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl index 3b7303300..932191ec5 100644 --- a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl @@ -146,7 +146,7 @@ end_per_testcase(_TestCase, Config) -> %%------------------------------------------------------------------------------ %% HTTP server for testing -%% (Orginally copied from emqx_bridge_api_SUITE) +%% (Originally copied from emqx_bridge_api_SUITE) %%------------------------------------------------------------------------------ start_http_server(HTTPServerConfig) -> process_flag(trap_exit, true), diff --git a/changes/feat-12348.en.md b/changes/feat-12348.en.md new file mode 100644 index 000000000..cf84bf5e2 --- /dev/null +++ b/changes/feat-12348.en.md @@ -0,0 +1 @@ +Added support for Elasticsearch Bridge. diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index 1d98d82db..c2f5f54ef 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -299,3 +299,5 @@ now_us ns elasticsearch ElasticSearch +doc_as_upsert +upsert