diff --git a/.ci/docker-compose-file/.env b/.ci/docker-compose-file/.env index 3be2b7415..73ec47d00 100644 --- a/.ci/docker-compose-file/.env +++ b/.ci/docker-compose-file/.env @@ -16,4 +16,13 @@ HSTREAMDB_ZK_TAG=3.8.1 MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server SQLSERVER_TAG=2019-CU19-ubuntu-20.04 + +# Password for the 'elastic' user (at least 6 characters) +ELASTIC_PASSWORD="emqx123" +# Password for the 'kibana_system' user (at least 6 characters) +KIBANA_PASSWORD="emqx123" +# Version of Elastic products +ELASTIC_TAG=8.11.4 +LICENSE=basic + TARGET=emqx/emqx diff --git a/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml b/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml new file mode 100644 index 000000000..50491a88a --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml @@ -0,0 +1,109 @@ +version: "3.9" + +services: + setup: + image: public.ecr.aws/elastic/elasticsearch:${ELASTIC_TAG} + volumes: + - ./elastic:/usr/share/elasticsearch/config/certs + user: "0" + command: > + bash -c ' + if [ x${ELASTIC_PASSWORD} == x ]; then + echo "Set the ELASTIC_PASSWORD environment variable in the .env file"; + exit 1; + elif [ x${KIBANA_PASSWORD} == x ]; then + echo "Set the KIBANA_PASSWORD environment variable in the .env file"; + exit 1; + fi; + echo "Setting file permissions" + chown -R root:root config/certs; + find . -type d -exec chmod 750 \{\} \;; + find . -type f -exec chmod 640 \{\} \;; + echo "Waiting for Elasticsearch availability"; + until curl -s --cacert config/certs/ca/ca.crt https://es01:9200 | grep -q "missing authentication credentials"; do sleep 30; done; + echo "Setting kibana_system password"; + until curl -s -X POST --cacert config/certs/ca/ca.crt -u "elastic:${ELASTIC_PASSWORD}" -H "Content-Type: application/json" https://es01:9200/_security/user/kibana_system/_password -d "{\"password\":\"${KIBANA_PASSWORD}\"}" | grep -q "^{}"; do sleep 10; done; + echo "All done!"; + ' + healthcheck: + test: ["CMD-SHELL", "[ -f config/certs/ca/ca.crt ]"] + interval: 1s + timeout: 5s + retries: 120 + + es01: + depends_on: + setup: + condition: service_healthy + image: public.ecr.aws/elastic/elasticsearch:${ELASTIC_TAG} + container_name: elasticsearch + hostname: elasticsearch + volumes: + - ./elastic:/usr/share/elasticsearch/config/certs + - esdata01:/usr/share/elasticsearch/data + ports: + - 9200:9200 + environment: + - node.name=es01 + - ELASTIC_PASSWORD=${ELASTIC_PASSWORD} + - bootstrap.memory_lock=true + - discovery.type=single-node + - xpack.security.enabled=true + - xpack.security.http.ssl.enabled=true + - xpack.security.http.ssl.key=certs/es01/es01.key + - xpack.security.http.ssl.certificate=certs/es01/es01.crt + - xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt + - xpack.license.self_generated.type=${LICENSE} + mem_limit: 1073741824 + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: + [ + "CMD-SHELL", + "curl -s --cacert config/certs/ca/ca.crt https://localhost:9200 | grep -q 'missing authentication credentials'", + ] + interval: 10s + timeout: 10s + retries: 120 + restart: always + networks: + - emqx_bridge + + kibana: + depends_on: + es01: + condition: service_healthy + image: public.ecr.aws/elastic/kibana:${ELASTIC_TAG} + volumes: + - ./elastic:/usr/share/kibana/config/certs + - kibanadata:/usr/share/kibana/data + ports: + - 5601:5601 + environment: + - SERVERNAME=kibana + - ELASTICSEARCH_HOSTS=https://es01:9200 + - ELASTICSEARCH_USERNAME=kibana_system + - ELASTICSEARCH_PASSWORD=${KIBANA_PASSWORD} + - ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES=config/certs/ca/ca.crt + mem_limit: 1073741824 + healthcheck: + test: + [ + "CMD-SHELL", + "curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'", + ] + interval: 10s + timeout: 10s + retries: 120 + restart: always + networks: + - emqx_bridge + +volumes: + esdata01: + driver: local + kibanadata: + driver: local diff --git a/.ci/docker-compose-file/elastic/ca/ca.crt b/.ci/docker-compose-file/elastic/ca/ca.crt new file mode 100644 index 000000000..2d47d555c --- /dev/null +++ b/.ci/docker-compose-file/elastic/ca/ca.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDSjCCAjKgAwIBAgIVAIrN275DCtGnotTPpxwvQ5751N4OMA0GCSqGSIb3DQEB +CwUAMDQxMjAwBgNVBAMTKUVsYXN0aWMgQ2VydGlmaWNhdGUgVG9vbCBBdXRvZ2Vu +ZXJhdGVkIENBMB4XDTI0MDExNjAyMzIyMFoXDTI3MDExNTAyMzIyMFowNDEyMDAG +A1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5lcmF0ZWQgQ0Ew +ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCy0nwiEurUkIPFMLV1weVM +pPk/AlwZUzqjkeL44gsY53XI9Q05w/sL9u6PzwrXgTCFWNXzI9+MoAtp8phPkn14 +cmg5/3sLe9YcFVFjYK/MoljlUbPDj+4dgk8l+w5FRSi0+JN5krUm7rYk9lojAkeS +fX8RU7ekKGbjBXIFtPxX5GNadu9RidR5GkHM3XroAIoris8bFOzMgFn9iybYnkhq +0S+Hpv0A8FVxzle0KNbPpsIkxXH2DnP2iPTDym9xJNl9Iv9MPtj9XaamH7TmXcSt +MbjkAudKsCw4bRuhHonM16DIUr8sX5UcRcAWyJ1x1qpZaOzMdh2VdYAHNuOsZwzJ +AgMBAAGjUzBRMB0GA1UdDgQWBBTAyDlp8NZfPe8NCGVlHJSVclGOhTAfBgNVHSME +GDAWgBTAyDlp8NZfPe8NCGVlHJSVclGOhTAPBgNVHRMBAf8EBTADAQH/MA0GCSqG +SIb3DQEBCwUAA4IBAQAeIUXRKmC53iirY4P49YspLafspAMf4ndMFQAp+Oc223Vs +hQC4axNoYnUdzWDH6LioAN7P826xNPqtXvTZF9fmeX7K8Nm9Kdj+for+QQI3j6+X +zq98VVkACb8b/Mc9Nac/WBbv/1IKyKgNNta7//WNPgAFolOfti/C0NLsPcKhrM9L +mGbvRX8ZjH8pVJ0YTy4/xfDcF7G/Lxl4Yvb0ZXpuQbvE1+Y0h5aoTNshT/skJxC4 +iyVseYr21s3pptKcr6H9KZuSdZe5pbEo+81nT15w+50aswFLk9GCYh5UsQ+1jkRK +cKgxP93i6x8BVbQJGKi1A1jhauSKX2IpWZQsHy4p +-----END CERTIFICATE----- diff --git a/.ci/docker-compose-file/elastic/ca/ca.key b/.ci/docker-compose-file/elastic/ca/ca.key new file mode 100644 index 000000000..72786207a --- /dev/null +++ b/.ci/docker-compose-file/elastic/ca/ca.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAstJ8IhLq1JCDxTC1dcHlTKT5PwJcGVM6o5Hi+OILGOd1yPUN +OcP7C/buj88K14EwhVjV8yPfjKALafKYT5J9eHJoOf97C3vWHBVRY2CvzKJY5VGz +w4/uHYJPJfsORUUotPiTeZK1Ju62JPZaIwJHkn1/EVO3pChm4wVyBbT8V+RjWnbv +UYnUeRpBzN166ACKK4rPGxTszIBZ/Ysm2J5IatEvh6b9APBVcc5XtCjWz6bCJMVx +9g5z9oj0w8pvcSTZfSL/TD7Y/V2mph+05l3ErTG45ALnSrAsOG0boR6JzNegyFK/ +LF+VHEXAFsidcdaqWWjszHYdlXWABzbjrGcMyQIDAQABAoIBAAZOLXYanmjpIRpX +h7h7oikYEplWDRcQBBvvKZaOyuchhznTKTiZmF0xQ3Ny8J4Ndj9ndODWSZxI6uod +FaGNp+qytwnfgDBVGSVDm6tyRfSkX1fTsA/j3/iupvmO/w9yezdZYgLaCVTyex31 +yVMdchZgYjYDUpEBYzJbV2xL18+GBRmmPjdXumlpcJqcclxjOQJSu/1WCGVfn/e/ +64NQpAm7NSKLqeUl32g0/DvUpmYRfmf7ZjVUjePaJQU6sw5/N+3V9F1hYs8VSWz0 +OMzYIfUcvixw+VWx5bu0nWt98FirhsQPjCTThD+DHP6koXGrdXpeMOQE1YZmoV5T +vP0X+FECgYEA5dsKVDQFL67muqz3CNRVM0xDWACCoa8789hYoxvhd1iO3e4kwXBa +ABPcZckioq+HiQ4UIxC2AhQ1FuTeIUTq7LZ0HtAAdKFi48U4LzmPhNUpG1E/HbJ3 +GQbi4u1cAzGYuhdywktgBhn9bJ4XB7+X3815Y9qKkuRcwtXgKGDy8HkCgYEAxyly +vc7NBkLfIAmkOsm6VXfvfBTEUBUGi6+k1rarTUxWFIgRuk4FHywwWUTdxWBKJz3n +HNNJb/g7CcufdhLTuWVHQtJDxYf2cJjoi+Kf7/i/Qs9Nyhokj5Mnh6KlZQOWXpZd +Gwn/O13NeDxt1TIVO2xp6zY4FhVEPvaHuxsMCtECgYA7/eR/P6iO3nZoCJbdXhXy +spftEw0FSCg8p53SzIcXUCzRrcM4HavP0181zb5VebzFP8Bvun/WoRGOLSPwyP0L +1T8Pf7huuGSIEERuxvY3dC8raxQvGxJMnOiA0/Ss/Lfg8hfIsEWashPb0pMuOYpZ +JlblgfejCSlQzOOZhlxB+QKBgQCKmizRLV9/0QAJAsy5YPR9UJdpCebJOKiyg806 +5Ct5AvwRE9UKjAuCczU+mu+f0fApOSpi5CQCeYVUvtG90UJpjrM2LLCfgoyeNbv4 +xgG6dqlcbHrdgK4bATUMbsOd9g4qy4gGLkHi5df9qkhhi5Y9Iajg2X3U2H4DN3yk +WSFbUQKBgQCLz333qWOuT3OBv+EYxHDQUS4YG+dReUos+v0iPJzu+spnfibBF5IC +RjHIhPsdN1byNB0naXOkkz4tUlLGXv6umFgDtQvy/2rxvxQmUGp/WY1VM2+164Xe +NEWdMEU6UckCoMO77kw8JosKhmXCYaSW5bWwnXuEpOj9WWpwjKtxlA== +-----END RSA PRIVATE KEY----- diff --git a/.ci/docker-compose-file/elastic/es01/es01.crt b/.ci/docker-compose-file/elastic/es01/es01.crt new file mode 100644 index 000000000..002f5727b --- /dev/null +++ b/.ci/docker-compose-file/elastic/es01/es01.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDQDCCAiigAwIBAgIUe90yOBN1KBxOEr2jro3epamZksIwDQYJKoZIhvcNAQEL +BQAwNDEyMDAGA1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5l +cmF0ZWQgQ0EwHhcNMjQwMTE2MDIzMjIyWhcNMjcwMTE1MDIzMjIyWjAPMQ0wCwYD +VQQDEwRlczAxMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxGEL71pV +j8qoUxEuL7qjRSeS1eHxeKhu2jqEZb7iA1o/7b/26QuYAkoYL+WuJNfYjg5F/O8W +VVuAYIlN6a/mC6wT2t3pX4YSrdp+i3gtAC/LX+8mAeqMQPD+4jitOwjOsYzbuFCb +nYl86dnFPl/+Pmj20mtZ+Wt7oIPD88j6+r5qgv59pHICxS7Cq304LDTRQbNoT8HO +4c9VGGGtWIdtrqiYrz1OVefkffMrvFt77v6dKHn8g5tSyfQUDCoEKtTOc3Pe5zCB +vIMs6HaapoSkl8XdpFHQ712PCZRebAMCrVcPYQ3r8e9GYmLY/NhxEn3dWTqRhHeg +UD13O8o1aBWonwIDAQABo28wbTAdBgNVHQ4EFgQUXvGJtSf2/mLOK17AzUridtCV +xWwwHwYDVR0jBBgwFoAUwMg5afDWXz3vDQhlZRyUlXJRjoUwIAYDVR0RBBkwF4IJ +bG9jYWxob3N0hwR/AAABggRlczAxMAkGA1UdEwQCMAAwDQYJKoZIhvcNAQELBQAD +ggEBACaNq3ZqrbsGvbEtrf6kJGIsTokTFHeVJUSYmt1ZZzDFLSepXAC/J8gphV45 +B+YSlkDPNTwMYlf7TUYY872zkdqOXN9r0NUx8MzVAX0+rux0RJba5GGUvJGZDNMX +WM5z9ry1KjQSQ1bSoRQOD3QArmBmhvikHjLc97Vqt56N0wA/ztXWOpNZX/TXmast +aXlUbcfQE73Cdq9tW1ATXwbQ2Gf7vVAUT3zjZSZbNdgPuBicGJHf85Fhjm2ND4+R +sjLIOQ2YgVxNHYbueScc6lJM5RNK194K7WrEQnRyGHT3NaDUm0FFNl//aQeq1ZVw +6gaUYlkTFauXwEYMDK901cWFaBE= +-----END CERTIFICATE----- diff --git a/.ci/docker-compose-file/elastic/es01/es01.key b/.ci/docker-compose-file/elastic/es01/es01.key new file mode 100644 index 000000000..b401c5376 --- /dev/null +++ b/.ci/docker-compose-file/elastic/es01/es01.key @@ -0,0 +1,27 @@ +-----BEGIN RSA PRIVATE KEY----- +MIIEpAIBAAKCAQEAxGEL71pVj8qoUxEuL7qjRSeS1eHxeKhu2jqEZb7iA1o/7b/2 +6QuYAkoYL+WuJNfYjg5F/O8WVVuAYIlN6a/mC6wT2t3pX4YSrdp+i3gtAC/LX+8m +AeqMQPD+4jitOwjOsYzbuFCbnYl86dnFPl/+Pmj20mtZ+Wt7oIPD88j6+r5qgv59 +pHICxS7Cq304LDTRQbNoT8HO4c9VGGGtWIdtrqiYrz1OVefkffMrvFt77v6dKHn8 +g5tSyfQUDCoEKtTOc3Pe5zCBvIMs6HaapoSkl8XdpFHQ712PCZRebAMCrVcPYQ3r +8e9GYmLY/NhxEn3dWTqRhHegUD13O8o1aBWonwIDAQABAoIBADJ3A/Om4az5dcce +96EBU9q+IDBBh2Wr1wzSk9p3sqoM47fLqH5b4dzYwJ1yZw2FwFtFFLw6jqExyexE +7JY8gyAFwPZyJ3pKQHuX1gQuRlYxchB9quU8Kn230LA+w1mT2lXrLj2PzWWvAsAv +m837KiFMpP0O5EjB07u8kLsRr1mG6QQ24Kc8oxd7xLXIiPzSvsOpYwo9hmIWENd5 +kyA7oSa9EmN3TRTkKOHI7cFQ3DqIGdO71waUofKOdx39DyHS2YKWxDE/LUjkS9zw +1AyZG09l4uowyLRqwYhivEq9Za6rdc64yheuHatAM9kC2AOcVcsCPZquIe90k4t1 +L7e9CAECgYEA1W483xTW8ngzxv9MMuPiW+PwVGRpyQrbO6OZOxdWEYfhrZlk5wlW +XK2T85jqooJwMWPTk1F49vZ9WN2KuLkL65GlkEtkFbxmOiFJjXuWwycbFSk05hPs +4AESBYHieaSPcwYhvLeG6g4PFyeqmbAGnKsJaj2ylPwDBOc7LgVlqAECgYEA64wo +gZwaj5SlP8M/OqGH04UVYr1kP/Eq6eiDfMyV5exy+pyzofZyNKUfJfw6sGgyRRHx +OVxlnPMsZ8zbdOXsvUEIeavpwDfQcp5eAURL65I6GMLsx2QpfiN2mDe1MqQW0jct +UleFaURgS84KHLE0+tBBg906jOHGjsE7Q3lyUJ8CgYBYYPev4K9JZGD8bEcfY6Ie +Lvsb1yC+8VHrFkmjYHxxcfUPr89KpGEwq2fynUW72YufyBiajkgq69Ln84U4DNhU +ydDnOXDOV191fsc4YQ8C7LSYRKH1DBcwgwD1at1fRbdpCAb8YHrrfLre+bv5PBzg +zyps5fOHIfwWEbI90lpQAQKBgQDoMMqBMTtxi+r1lucOScrVtFuncOCQs5BE8cIj +1JxzAQk6iBv/LSvZP2gcDq5f1Oaw9YXfsHguJfwA+ozeiAQ9bw0Gu3N52sstIXWz +M/rO5d9FJ2k3CEJqqFSwqkGBAQXKBUA06jeF1DREpX+MVxbNo1rhvMOJusn7UPm1 +gtMwKwKBgQCfRzFO10ITwrw8rcRZwO9Axgqf11V7xn6qpgRxj4h0HOErVTCN1H0b +vE3Pz7cxS/g9vFRP37TuqBLfGVzPt9LAEFwCWPeZJLROBLHyu8XrhTbQx+sI2/pe +SBEJAQAHtYasFTE0sBEKNEY2rIt1c29XZhyhhtNKD9gRN/gB355wLg== +-----END RSA PRIVATE KEY----- diff --git a/.ci/docker-compose-file/elastic/instances.yml b/.ci/docker-compose-file/elastic/instances.yml new file mode 100644 index 000000000..bf1ad4102 --- /dev/null +++ b/.ci/docker-compose-file/elastic/instances.yml @@ -0,0 +1,7 @@ +instances: + - name: es01 + dns: + - es01 + - localhost + ip: + - 127.0.0.1 diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json index 4b2b6ccf2..c58474039 100644 --- a/.ci/docker-compose-file/toxiproxy.json +++ b/.ci/docker-compose-file/toxiproxy.json @@ -191,5 +191,11 @@ "listen": "0.0.0.0:636", "upstream": "ldap:636", "enabled": true + }, + { + "name": "elasticsearch", + "listen": "0.0.0.0:9200", + "upstream": "elasticsearch:9200", + "enabled": true } ] diff --git a/apps/emqx_bridge_es/docker-ct b/apps/emqx_bridge_es/docker-ct index 80f0d394b..f7cd9ab28 100644 --- a/apps/emqx_bridge_es/docker-ct +++ b/apps/emqx_bridge_es/docker-ct @@ -1 +1,2 @@ toxiproxy +elasticsearch diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl index b575f32ed..032439574 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -59,7 +59,7 @@ fields(action_create) -> action(create), index(), id(false), - doc(true), + doc(), routing(), require_alias(), overwrite() @@ -72,7 +72,8 @@ fields(action_update) -> action(update), index(), id(true), - doc(true), + doc(), + doc_as_upsert(), routing(), require_alias() | http_common_opts() @@ -98,10 +99,14 @@ action_union_member_selector({value, Value}) -> [?R_REF(action_delete)]; #{<<"action">> := <<"update">>} -> [?R_REF(action_update)]; - _ -> + #{<<"action">> := Action} when is_atom(Action) -> + Value1 = Value#{<<"action">> => atom_to_binary(Action)}, + action_union_member_selector({value, Value1}); + Actual -> Expected = "create | delete | update", throw(#{ field_name => action, + actual => Actual, expected => Expected }) end. @@ -149,12 +154,12 @@ id(Required) -> } )}. -doc(Required) -> +doc() -> {doc, ?HOCON( binary(), #{ - required => Required, + required => false, example => <<"${payload.doc}">>, desc => ?DESC("config_parameters_doc") } @@ -168,6 +173,17 @@ http_common_opts() -> emqx_bridge_http_schema:fields("parameters_opts") ). +doc_as_upsert() -> + {doc_as_upsert, + ?HOCON( + boolean(), + #{ + required => false, + default => false, + desc => ?DESC("config_doc_as_upsert") + } + )}. + routing() -> {routing, ?HOCON( diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index fe86eac56..8b68af10f 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -33,6 +33,8 @@ connector_example_values/0 ]). +-export([render_template/2]). + %% emqx_connector_resource behaviour callbacks -export([connector_config/2]). @@ -200,7 +202,7 @@ on_start(InstanceId, Config) -> ?SLOG(info, #{ msg => "elasticsearch_bridge_started", instance_id => InstanceId, - request => maps:get(request, State, <<>>) + request => emqx_utils:redact(maps:get(request, State, <<>>)) }), ?tp(elasticsearch_bridge_started, #{instance_id => InstanceId}), {ok, State#{channels => #{}}}; @@ -208,7 +210,7 @@ on_start(InstanceId, Config) -> ?SLOG(error, #{ msg => "failed_to_start_elasticsearch_bridge", instance_id => InstanceId, - request => maps:get(request, Config, <<>>), + request => emqx_utils:redact(maps:get(request, Config, <<>>)), reason => Reason }), throw(failed_to_start_elasticsearch_bridge) @@ -286,8 +288,12 @@ on_add_channel( method => method(Parameter), body => get_body_template(Parameter) }, + ChannelConfig = #{ + parameters => Parameter1, + render_template_func => fun ?MODULE:render_template/2 + }, {ok, State} = emqx_bridge_http_connector:on_add_channel( - InstanceId, State0, ChannelId, #{parameters => Parameter1} + InstanceId, State0, ChannelId, ChannelConfig ), Channel = Parameter1, Channels2 = Channels#{ChannelId => Channel}, @@ -310,9 +316,23 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) -> {error, not_exists} end. +render_template(Template, Msg) -> + % Ignoring errors here, undefined bindings will be replaced with empty string. + Opts = #{var_trans => fun to_string/2}, + {String, _Errors} = emqx_template:render(Template, {emqx_jsonish, Msg}, Opts), + String. + %%-------------------------------------------------------------------- %% Internal Functions %%-------------------------------------------------------------------- + +to_string(Name, Value) -> + emqx_template:to_string(render_var(Name, Value)). +render_var(_, undefined) -> + % NOTE Any allowed but undefined binding will be replaced with empty string + <<>>; +render_var(_Name, Value) -> + Value. %% delete DELETE //_doc/<_id> path(#{action := delete, id := Id, index := Index} = Action) -> BasePath = ["/", Index, "/_doc/", Id], @@ -354,6 +374,7 @@ add_query_string(Keys, Param0) -> end. to_str(List) when is_list(List) -> List; +to_str(Bin) when is_binary(Bin) -> binary_to_list(Bin); to_str(false) -> "false"; to_str(true) -> "true"; to_str(Atom) when is_atom(Atom) -> atom_to_list(Atom). @@ -369,5 +390,12 @@ handle_response({ok, Code, Body}) -> handle_response({error, _} = Error) -> Error. -get_body_template(#{doc := Doc}) -> Doc; -get_body_template(_) -> undefined. +get_body_template(#{action := update, doc := Doc} = Template) -> + case maps:get(doc_as_upsert, Template, false) of + false -> <<"{\"doc\":", Doc/binary, "}">>; + true -> <<"{\"doc\":", Doc/binary, ",\"doc_as_upsert\": true}">> + end; +get_body_template(#{doc := Doc}) -> + Doc; +get_body_template(_) -> + undefined. diff --git a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl new file mode 100644 index 000000000..a9ff70957 --- /dev/null +++ b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE.erl @@ -0,0 +1,379 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved. +%% +%% Licensed under the Apache License, Version 2.0 (the "License"); +%% you may not use this file except in compliance with the License. +%% You may obtain a copy of the License at +%% http://www.apache.org/licenses/LICENSE-2.0 +%% +%% Unless required by applicable law or agreed to in writing, software +%% distributed under the License is distributed on an "AS IS" BASIS, +%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +%% See the License for the specific language governing permissions and +%% limitations under the License. +%%-------------------------------------------------------------------- + +-module(emqx_bridge_es_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). +-include_lib("emqx_resource/include/emqx_resource.hrl"). + +-import(emqx_common_test_helpers, [on_exit/1]). + +-define(TYPE, elasticsearch). +-define(CA, "es.crt"). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + emqx_common_test_helpers:all(?MODULE). + +init_per_suite(Config) -> + ProxyName = "elasticsearch", + ESHost = os:getenv("ELASTICSEARCH_HOST", "elasticsearch"), + ESPort = list_to_integer(os:getenv("ELASTICSEARCH_PORT", "9200")), + Apps = emqx_cth_suite:start( + [ + emqx, + emqx_conf, + emqx_connector, + emqx_bridge_es, + emqx_bridge, + emqx_rule_engine, + emqx_management, + {emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"} + ], + #{work_dir => emqx_cth_suite:work_dir(Config)} + ), + {ok, _} = emqx_common_test_http:create_default_app(), + wait_until_elasticsearch_is_up(ESHost, ESPort), + [ + {apps, Apps}, + {proxy_name, ProxyName}, + {es_host, ESHost}, + {es_port, ESPort} + | Config + ]. + +es_checks() -> + case os:getenv("IS_CI") of + "yes" -> 10; + _ -> 1 + end. + +wait_until_elasticsearch_is_up(Host, Port) -> + wait_until_elasticsearch_is_up(es_checks(), Host, Port). + +wait_until_elasticsearch_is_up(0, Host, Port) -> + throw({{Host, Port}, not_available}); +wait_until_elasticsearch_is_up(Count, Host, Port) -> + timer:sleep(1000), + case emqx_common_test_helpers:is_all_tcp_servers_available([{Host, Port}]) of + true -> ok; + false -> wait_until_elasticsearch_is_up(Count - 1, Host, Port) + end. + +end_per_suite(Config) -> + Apps = ?config(apps, Config), + %ProxyHost = ?config(proxy_host, Config), + %ProxyPort = ?config(proxy_port, Config), + %emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_cth_suite:stop(Apps), + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + %ProxyHost = ?config(proxy_host, Config), + %ProxyPort = ?config(proxy_port, Config), + %emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), + emqx_common_test_helpers:call_janitor(60_000), + ok. + +%%------------------------------------------------------------------------------------- +%% Helper fns +%%------------------------------------------------------------------------------------- + +check_send_message_with_action(Topic, ActionName, ConnectorName) -> + send_message(Topic), + %% ###################################### + %% Check if message is sent to es + %% ###################################### + timer:sleep(500), + check_action_metrics(ActionName, ConnectorName). + +send_message(Topic) -> + Now = emqx_utils_calendar:now_to_rfc3339(microsecond), + Doc = #{<<"name">> => <<"emqx">>, <<"release_date">> => Now}, + Index = <<"emqx-test-index">>, + Payload = emqx_utils_json:encode(#{doc => Doc, index => Index}), + + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + {ok, Client} = emqtt:start_link([{clientid, ClientId}, {port, 1883}]), + {ok, _} = emqtt:connect(Client), + ok = emqtt:publish(Client, Topic, Payload, [{qos, 0}]), + ok. + +check_action_metrics(ActionName, ConnectorName) -> + ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName), + Metrics = + #{ + match => emqx_resource_metrics:matched_get(ActionId), + success => emqx_resource_metrics:success_get(ActionId), + failed => emqx_resource_metrics:failed_get(ActionId), + queuing => emqx_resource_metrics:queuing_get(ActionId), + dropped => emqx_resource_metrics:dropped_get(ActionId) + }, + ?assertEqual( + #{ + match => 1, + success => 1, + dropped => 0, + failed => 0, + queuing => 0 + }, + Metrics, + {ActionName, ConnectorName, ActionId} + ). + +action_config(ConnectorName) -> + action_config(ConnectorName, _Overrides = #{}). + +action_config(ConnectorName, Overrides) -> + Cfg0 = action(ConnectorName), + emqx_utils_maps:deep_merge(Cfg0, Overrides). + +action(ConnectorName) -> + #{ + <<"description">> => <<"My elasticsearch test action">>, + <<"enable">> => true, + <<"parameters">> => #{ + <<"index">> => <<"${payload.index}">>, + <<"action">> => <<"create">>, + <<"doc">> => <<"${payload.doc}">>, + <<"overwrite">> => true + }, + <<"connector">> => ConnectorName, + <<"resource_opts">> => #{ + <<"health_check_interval">> => <<"30s">>, + <<"query_mode">> => <<"sync">> + } + }. + +base_url(Config) -> + Host = ?config(es_host, Config), + Port = ?config(es_port, Config), + iolist_to_binary([ + "https://", + Host, + ":", + integer_to_binary(Port) + ]). + +connector_config(Config) -> + connector_config(_Overrides = #{}, Config). + +connector_config(Overrides, Config) -> + Defaults = + #{ + <<"base_url">> => base_url(Config), + <<"enable">> => true, + <<"authentication">> => #{ + <<"password">> => <<"emqx123">>, + <<"username">> => <<"elastic">> + }, + <<"description">> => <<"My elasticsearch test connector">>, + <<"connect_timeout">> => <<"15s">>, + <<"pool_size">> => 2, + <<"pool_type">> => <<"random">>, + <<"enable_pipelining">> => 100, + <<"ssl">> => #{ + <<"enable">> => true, + <<"hibernate_after">> => <<"5s">>, + <<"cacertfile">> => filename:join(?config(data_dir, Config), ?CA) + } + }, + emqx_utils_maps:deep_merge(Defaults, Overrides). + +create_connector(Name, Config) -> + Res = emqx_connector:create(?TYPE, Name, Config), + on_exit(fun() -> emqx_connector:remove(?TYPE, Name) end), + Res. + +create_action(Name, Config) -> + Res = emqx_bridge_v2:create(?TYPE, Name, Config), + on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end), + Res. + +action_api_spec_props_for_get() -> + #{ + <<"bridge_elasticsearch.get_bridge_v2">> := + #{<<"properties">> := Props} + } = + emqx_bridge_v2_testlib:actions_api_spec_schemas(), + Props. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_create_remove_list(Config) -> + [] = emqx_bridge_v2:list(), + ConnectorConfig = connector_config(Config), + {ok, _} = emqx_connector:create(?TYPE, test_connector, ConnectorConfig), + ActionConfig = action(<<"test_connector">>), + {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_1, ActionConfig), + [ActionInfo] = emqx_bridge_v2:list(), + #{ + name := <<"test_action_1">>, + type := <<"elasticsearch">>, + raw_config := _, + status := connected + } = ActionInfo, + {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_2, ActionConfig), + 2 = length(emqx_bridge_v2:list()), + ok = emqx_bridge_v2:remove(?TYPE, test_action_1), + 1 = length(emqx_bridge_v2:list()), + ok = emqx_bridge_v2:remove(?TYPE, test_action_2), + [] = emqx_bridge_v2:list(), + emqx_connector:remove(?TYPE, test_connector), + ok. + +%% Test sending a message to a bridge V2 +t_send_message(Config) -> + ConnectorConfig = connector_config(Config), + {ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig), + ActionConfig = action(<<"test_connector2">>), + {ok, _} = emqx_bridge_v2:create(?TYPE, test_action_1, ActionConfig), + Rule = #{ + id => <<"rule:t_es">>, + sql => <<"SELECT\n *\nFROM\n \"es/#\"">>, + actions => [<<"elasticsearch:test_action_1">>], + description => <<"sink doc to elasticsearch">> + }, + {ok, _} = emqx_rule_engine:create_rule(Rule), + %% Use the action to send a message + check_send_message_with_action(<<"es/1">>, test_action_1, test_connector2), + %% Create a few more bridges with the same connector and test them + ActionNames1 = + lists:foldl( + fun(I, Acc) -> + Seq = integer_to_binary(I), + ActionNameStr = "test_action_" ++ integer_to_list(I), + ActionName = list_to_atom(ActionNameStr), + {ok, _} = emqx_bridge_v2:create(?TYPE, ActionName, ActionConfig), + Rule1 = #{ + id => <<"rule:t_es", Seq/binary>>, + sql => <<"SELECT\n *\nFROM\n \"es/", Seq/binary, "\"">>, + actions => [<<"elasticsearch:", (list_to_binary(ActionNameStr))/binary>>], + description => <<"sink doc to elasticsearch">> + }, + {ok, _} = emqx_rule_engine:create_rule(Rule1), + Topic = <<"es/", Seq/binary>>, + check_send_message_with_action(Topic, ActionName, test_connector2), + [ActionName | Acc] + end, + [], + lists:seq(2, 10) + ), + ActionNames = [test_action_1 | ActionNames1], + %% Remove all the bridges + lists:foreach( + fun(BridgeName) -> + ok = emqx_bridge_v2:remove(?TYPE, BridgeName) + end, + ActionNames + ), + emqx_connector:remove(?TYPE, test_connector2), + ok. + +%% Test that we can get the status of the bridge V2 +t_health_check(Config) -> + BridgeV2Config = action(<<"test_connector3">>), + ConnectorConfig = connector_config(Config), + {ok, _} = emqx_connector:create(?TYPE, test_connector3, ConnectorConfig), + {ok, _} = emqx_bridge_v2:create(?TYPE, test_bridge_v2, BridgeV2Config), + #{status := connected} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2), + ok = emqx_bridge_v2:remove(?TYPE, test_bridge_v2), + %% Check behaviour when bridge does not exist + {error, bridge_not_found} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2), + ok = emqx_connector:remove(?TYPE, test_connector3), + ok. + +t_bad_url(Config) -> + ConnectorName = <<"test_connector">>, + ActionName = <<"test_action">>, + ActionConfig = action(<<"test_connector">>), + ConnectorConfig0 = connector_config(Config), + ConnectorConfig = ConnectorConfig0#{<<"base_url">> := <<"bad_host:9092">>}, + ?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)), + ?assertMatch({ok, _}, create_action(ActionName, ActionConfig)), + ?assertMatch( + {ok, #{ + resource_data := + #{ + status := ?status_disconnected, + error := failed_to_start_elasticsearch_bridge + } + }}, + emqx_connector:lookup(?TYPE, ConnectorName) + ), + ?assertMatch({ok, #{status := ?status_disconnected}}, emqx_bridge_v2:lookup(?TYPE, ActionName)), + ok. + +t_parameters_key_api_spec(_Config) -> + ActionProps = action_api_spec_props_for_get(), + ?assertNot(is_map_key(<<"elasticsearch">>, ActionProps), #{action_props => ActionProps}), + ?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}), + ok. + +t_http_api_get(Config) -> + ConnectorName = <<"test_connector">>, + ActionName = <<"test_action">>, + ActionConfig = action(ConnectorName), + ConnectorConfig = connector_config(Config), + ?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)), + ?assertMatch({ok, _}, create_action(ActionName, ActionConfig)), + ?assertMatch( + {ok, + {{_, 200, _}, _, [ + #{ + <<"connector">> := ConnectorName, + <<"description">> := <<"My elasticsearch test action">>, + <<"enable">> := true, + <<"error">> := <<>>, + <<"name">> := ActionName, + <<"node_status">> := + [ + #{ + <<"node">> := _, + <<"status">> := <<"connected">>, + <<"status_reason">> := <<>> + } + ], + <<"parameters">> := + #{ + <<"action">> := <<"create">>, + <<"doc">> := <<"${payload.doc}">>, + <<"index">> := <<"${payload.index}">>, + <<"max_retries">> := 2, + <<"overwrite">> := true + }, + <<"resource_opts">> := #{<<"query_mode">> := <<"sync">>}, + <<"status">> := <<"connected">>, + <<"status_reason">> := <<>>, + <<"type">> := <<"elasticsearch">> + } + ]}}, + emqx_bridge_v2_testlib:list_bridges_api() + ), + ok. diff --git a/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE_data/es.crt b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE_data/es.crt new file mode 100644 index 000000000..2d47d555c --- /dev/null +++ b/apps/emqx_bridge_es/test/emqx_bridge_es_SUITE_data/es.crt @@ -0,0 +1,20 @@ +-----BEGIN CERTIFICATE----- +MIIDSjCCAjKgAwIBAgIVAIrN275DCtGnotTPpxwvQ5751N4OMA0GCSqGSIb3DQEB +CwUAMDQxMjAwBgNVBAMTKUVsYXN0aWMgQ2VydGlmaWNhdGUgVG9vbCBBdXRvZ2Vu +ZXJhdGVkIENBMB4XDTI0MDExNjAyMzIyMFoXDTI3MDExNTAyMzIyMFowNDEyMDAG +A1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5lcmF0ZWQgQ0Ew +ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCy0nwiEurUkIPFMLV1weVM +pPk/AlwZUzqjkeL44gsY53XI9Q05w/sL9u6PzwrXgTCFWNXzI9+MoAtp8phPkn14 +cmg5/3sLe9YcFVFjYK/MoljlUbPDj+4dgk8l+w5FRSi0+JN5krUm7rYk9lojAkeS +fX8RU7ekKGbjBXIFtPxX5GNadu9RidR5GkHM3XroAIoris8bFOzMgFn9iybYnkhq +0S+Hpv0A8FVxzle0KNbPpsIkxXH2DnP2iPTDym9xJNl9Iv9MPtj9XaamH7TmXcSt +MbjkAudKsCw4bRuhHonM16DIUr8sX5UcRcAWyJ1x1qpZaOzMdh2VdYAHNuOsZwzJ +AgMBAAGjUzBRMB0GA1UdDgQWBBTAyDlp8NZfPe8NCGVlHJSVclGOhTAfBgNVHSME +GDAWgBTAyDlp8NZfPe8NCGVlHJSVclGOhTAPBgNVHRMBAf8EBTADAQH/MA0GCSqG +SIb3DQEBCwUAA4IBAQAeIUXRKmC53iirY4P49YspLafspAMf4ndMFQAp+Oc223Vs +hQC4axNoYnUdzWDH6LioAN7P826xNPqtXvTZF9fmeX7K8Nm9Kdj+for+QQI3j6+X +zq98VVkACb8b/Mc9Nac/WBbv/1IKyKgNNta7//WNPgAFolOfti/C0NLsPcKhrM9L +mGbvRX8ZjH8pVJ0YTy4/xfDcF7G/Lxl4Yvb0ZXpuQbvE1+Y0h5aoTNshT/skJxC4 +iyVseYr21s3pptKcr6H9KZuSdZe5pbEo+81nT15w+50aswFLk9GCYh5UsQ+1jkRK +cKgxP93i6x8BVbQJGKi1A1jhauSKX2IpWZQsHy4p +-----END CERTIFICATE----- diff --git a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl index 8f54694e9..a148a4d16 100644 --- a/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl +++ b/apps/emqx_bridge_http/src/emqx_bridge_http_connector.erl @@ -38,6 +38,7 @@ ]). -export([reply_delegator/3]). +-export([render_template/2]). -export([ roots/0, @@ -266,7 +267,9 @@ on_add_channel( ) -> InstalledActions = maps:get(installed_actions, OldState, #{}), {ok, ActionState} = do_create_http_action(ActionConfig), - NewInstalledActions = maps:put(ActionId, ActionState, InstalledActions), + RenderTmplFunc = maps:get(render_template_func, ActionConfig, fun ?MODULE:render_template/2), + ActionState1 = ActionState#{render_template_func => RenderTmplFunc}, + NewInstalledActions = maps:put(ActionId, ActionState1, InstalledActions), NewState = maps:put(installed_actions, NewInstalledActions, OldState), {ok, NewState}. @@ -631,9 +634,10 @@ parse_template(String) -> process_request_and_action(Request, ActionState, Msg) -> MethodTemplate = maps:get(method, ActionState), - Method = make_method(render_template_string(MethodTemplate, Msg)), - PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)), - PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)), + RenderTmplFunc = maps:get(render_template_func, ActionState), + Method = make_method(render_template_string(MethodTemplate, RenderTmplFunc, Msg)), + PathPrefix = unicode:characters_to_list(RenderTmplFunc(maps:get(path, Request), Msg)), + PathSuffix = unicode:characters_to_list(RenderTmplFunc(maps:get(path, ActionState), Msg)), Path = case PathSuffix of @@ -644,11 +648,11 @@ process_request_and_action(Request, ActionState, Msg) -> HeadersTemplate1 = maps:get(headers, Request), HeadersTemplate2 = maps:get(headers, ActionState), Headers = merge_proplist( - render_headers(HeadersTemplate1, Msg), - render_headers(HeadersTemplate2, Msg) + render_headers(HeadersTemplate1, RenderTmplFunc, Msg), + render_headers(HeadersTemplate2, RenderTmplFunc, Msg) ), BodyTemplate = maps:get(body, ActionState), - Body = render_request_body(BodyTemplate, Msg), + Body = render_request_body(BodyTemplate, RenderTmplFunc, Msg), #{ method => Method, path => Path, @@ -681,25 +685,26 @@ process_request( } = Conf, Msg ) -> + RenderTemplateFun = fun render_template/2, Conf#{ - method => make_method(render_template_string(MethodTemplate, Msg)), - path => unicode:characters_to_list(render_template(PathTemplate, Msg)), - body => render_request_body(BodyTemplate, Msg), - headers => render_headers(HeadersTemplate, Msg), + method => make_method(render_template_string(MethodTemplate, RenderTemplateFun, Msg)), + path => unicode:characters_to_list(RenderTemplateFun(PathTemplate, Msg)), + body => render_request_body(BodyTemplate, RenderTemplateFun, Msg), + headers => render_headers(HeadersTemplate, RenderTemplateFun, Msg), request_timeout => ReqTimeout }. -render_request_body(undefined, Msg) -> +render_request_body(undefined, _, Msg) -> emqx_utils_json:encode(Msg); -render_request_body(BodyTks, Msg) -> - render_template(BodyTks, Msg). +render_request_body(BodyTks, RenderTmplFunc, Msg) -> + RenderTmplFunc(BodyTks, Msg). -render_headers(HeaderTks, Msg) -> +render_headers(HeaderTks, RenderTmplFunc, Msg) -> lists:map( fun({K, V}) -> { - render_template_string(K, Msg), - render_template_string(emqx_secret:unwrap(V), Msg) + render_template_string(K, RenderTmplFunc, Msg), + render_template_string(emqx_secret:unwrap(V), RenderTmplFunc, Msg) } end, HeaderTks @@ -710,8 +715,8 @@ render_template(Template, Msg) -> {String, _Errors} = emqx_template:render(Template, {emqx_jsonish, Msg}), String. -render_template_string(Template, Msg) -> - unicode:characters_to_binary(render_template(Template, Msg)). +render_template_string(Template, RenderTmplFunc, Msg) -> + unicode:characters_to_binary(RenderTmplFunc(Template, Msg)). make_method(M) when M == <<"POST">>; M == <<"post">> -> post; make_method(M) when M == <<"PUT">>; M == <<"put">> -> put; diff --git a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl index 3b7303300..932191ec5 100644 --- a/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl +++ b/apps/emqx_bridge_http/test/emqx_bridge_http_SUITE.erl @@ -146,7 +146,7 @@ end_per_testcase(_TestCase, Config) -> %%------------------------------------------------------------------------------ %% HTTP server for testing -%% (Orginally copied from emqx_bridge_api_SUITE) +%% (Originally copied from emqx_bridge_api_SUITE) %%------------------------------------------------------------------------------ start_http_server(HTTPServerConfig) -> process_flag(trap_exit, true), diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index d7b45c18d..ccf97f143 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -212,7 +212,7 @@ on_start(InstanceId, #{iotdb_version := Version} = Config) -> ?SLOG(info, #{ msg => "iotdb_bridge_started", instance_id => InstanceId, - request => maps:get(request, State, <<>>) + request => emqx_utils:redact(maps:get(request, State, <<>>)) }), ?tp(iotdb_bridge_started, #{instance_id => InstanceId}), {ok, State#{iotdb_version => Version, channels => #{}}}; @@ -220,7 +220,7 @@ on_start(InstanceId, #{iotdb_version := Version} = Config) -> ?SLOG(error, #{ msg => "failed_to_start_iotdb_bridge", instance_id => InstanceId, - request => maps:get(request, Config, <<>>), + request => emqx_utils:redact(maps:get(request, Config, <<>>)), reason => Reason }), throw(failed_to_start_iotdb_bridge) diff --git a/apps/emqx_connector/src/emqx_connector.erl b/apps/emqx_connector/src/emqx_connector.erl index 27dfcba2a..92cf9439e 100644 --- a/apps/emqx_connector/src/emqx_connector.erl +++ b/apps/emqx_connector/src/emqx_connector.erl @@ -150,6 +150,9 @@ post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) -> ok = emqx_connector_resource:remove(Type, Name), ?tp(connector_post_config_update_done, #{}), ok; + {error, not_found} -> + ?tp(connector_post_config_update_done, #{}), + ok; {ok, Channels} -> {error, {active_channels, Channels}} end; diff --git a/changes/feat-12348.en.md b/changes/feat-12348.en.md new file mode 100644 index 000000000..cf84bf5e2 --- /dev/null +++ b/changes/feat-12348.en.md @@ -0,0 +1 @@ +Added support for Elasticsearch Bridge. diff --git a/rel/i18n/emqx_bridge_es.hocon b/rel/i18n/emqx_bridge_es.hocon index 62778a712..8ad11f05b 100644 --- a/rel/i18n/emqx_bridge_es.hocon +++ b/rel/i18n/emqx_bridge_es.hocon @@ -56,6 +56,12 @@ config_routing.desc: config_routing.label: """Routing""" +config_doc_as_upsert.desc: +"""Instead of sending a partial doc plus an upsert doc, +you can set doc_as_upsert to true to use the contents of doc as the upsert value.""" +config_doc_as_upsert.label: +"""doc_as_upsert""" + config_wait_for_active_shards.desc: """The number of shard copies that must be active before proceeding with the operation. Set to all or any positive integer up to the total number of shards in the index (number_of_replicas+1). @@ -97,7 +103,7 @@ config_parameters_require_alias.label: """_require_alias""" config_parameters_doc.desc: -"""JSON document""" +"""JSON document. If undefined, rule engine will use JSON format to serialize all visible inputs, such as clientid, topic, payload etc.""" config_parameters_doc.label: """doc""" diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 7959581a9..af04fd9ee 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -246,6 +246,9 @@ for dep in ${CT_DEPS}; do otel) FILES+=( '.ci/docker-compose-file/docker-compose-otel.yaml' ) ;; + elasticsearch) + FILES+=( '.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml' ) + ;; *) echo "unknown_ct_dependency $dep" exit 1 diff --git a/scripts/spellcheck/dicts/emqx.txt b/scripts/spellcheck/dicts/emqx.txt index 1d98d82db..c2f5f54ef 100644 --- a/scripts/spellcheck/dicts/emqx.txt +++ b/scripts/spellcheck/dicts/emqx.txt @@ -299,3 +299,5 @@ now_us ns elasticsearch ElasticSearch +doc_as_upsert +upsert