From 7f5fe9190565157cb381c75a72e0e4127fa94b8c Mon Sep 17 00:00:00 2001 From: zhongwencool Date: Wed, 17 Jan 2024 17:14:28 +0800 Subject: [PATCH] fix: es's action is atom not binary --- .../docker-compose-elastic-search-tls.yaml | 101 ++++++++++++++++++ apps/emqx_bridge_es/src/emqx_bridge_es.erl | 6 +- .../src/emqx_bridge_es_connector.erl | 5 +- .../src/emqx_bridge_iotdb_connector.erl | 4 +- 4 files changed, 111 insertions(+), 5 deletions(-) create mode 100644 .ci/docker-compose-file/docker-compose-elastic-search-tls.yaml diff --git a/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml b/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml new file mode 100644 index 000000000..fef93d785 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml @@ -0,0 +1,101 @@ +version: "3.9" + +services: + setup: + image: public.ecr.aws/elastic/elasticsearch:${ELASTIC_TAG} + volumes: + - ./elastic:/usr/share/elasticsearch/config/certs + user: "0" + command: > + bash -c ' + if [ x${ELASTIC_PASSWORD} == x ]; then + echo "Set the ELASTIC_PASSWORD environment variable in the .env file"; + exit 1; + elif [ x${KIBANA_PASSWORD} == x ]; then + echo "Set the KIBANA_PASSWORD environment variable in the .env file"; + exit 1; + fi; + echo "Setting file permissions" + chown -R root:root config/certs; + find . -type d -exec chmod 750 \{\} \;; + find . -type f -exec chmod 640 \{\} \;; + echo "Waiting for Elasticsearch availability"; + until curl -s --cacert config/certs/ca/ca.crt https://es01:9200 | grep -q "missing authentication credentials"; do sleep 30; done; + echo "Setting kibana_system password"; + until curl -s -X POST --cacert config/certs/ca/ca.crt -u "elastic:${ELASTIC_PASSWORD}" -H "Content-Type: application/json" https://es01:9200/_security/user/kibana_system/_password -d "{\"password\":\"${KIBANA_PASSWORD}\"}" | grep -q "^{}"; do sleep 10; done; + echo "All done!"; + ' + healthcheck: + test: ["CMD-SHELL", "[ -f config/certs/ca/ca.crt ]"] + interval: 1s + timeout: 5s + retries: 120 + + es01: + depends_on: + setup: + condition: service_healthy + image: public.ecr.aws/elastic/elasticsearch:${ELASTIC_TAG} + volumes: + - ./elastic:/usr/share/elasticsearch/config/certs + - esdata01:/usr/share/elasticsearch/data + ports: + - 9200:9200 + environment: + - node.name=es01 + - ELASTIC_PASSWORD=${ELASTIC_PASSWORD} + - bootstrap.memory_lock=true + - discovery.type=single-node + - xpack.security.enabled=true + - xpack.security.http.ssl.enabled=true + - xpack.security.http.ssl.key=certs/es01/es01.key + - xpack.security.http.ssl.certificate=certs/es01/es01.crt + - xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt + - xpack.license.self_generated.type=${LICENSE} + mem_limit: 1073741824 + ulimits: + memlock: + soft: -1 + hard: -1 + healthcheck: + test: + [ + "CMD-SHELL", + "curl -s --cacert config/certs/ca/ca.crt https://localhost:9200 | grep -q 'missing authentication credentials'", + ] + interval: 10s + timeout: 10s + retries: 120 + + kibana: + depends_on: + es01: + condition: service_healthy + image: public.ecr.aws/elastic/kibana:${ELASTIC_TAG} + volumes: + - ./elastic:/usr/share/kibana/config/certs + - kibanadata:/usr/share/kibana/data + ports: + - 5601:5601 + environment: + - SERVERNAME=kibana + - ELASTICSEARCH_HOSTS=https://es01:9200 + - ELASTICSEARCH_USERNAME=kibana_system + - ELASTICSEARCH_PASSWORD=${KIBANA_PASSWORD} + - ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES=config/certs/ca/ca.crt + mem_limit: 1073741824 + healthcheck: + test: + [ + "CMD-SHELL", + "curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'", + ] + interval: 10s + timeout: 10s + retries: 120 + +volumes: + esdata01: + driver: local + kibanadata: + driver: local diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es.erl b/apps/emqx_bridge_es/src/emqx_bridge_es.erl index b575f32ed..9975eb1b1 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es.erl @@ -98,10 +98,14 @@ action_union_member_selector({value, Value}) -> [?R_REF(action_delete)]; #{<<"action">> := <<"update">>} -> [?R_REF(action_update)]; - _ -> + #{<<"action">> := Action} when is_atom(Action) -> + Value1 = Value#{<<"action">> => atom_to_binary(Action)}, + action_union_member_selector({value, Value1}); + Actual -> Expected = "create | delete | update", throw(#{ field_name => action, + actual => Actual, expected => Expected }) end. diff --git a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl index fe86eac56..7e49aeb55 100644 --- a/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl +++ b/apps/emqx_bridge_es/src/emqx_bridge_es_connector.erl @@ -200,7 +200,7 @@ on_start(InstanceId, Config) -> ?SLOG(info, #{ msg => "elasticsearch_bridge_started", instance_id => InstanceId, - request => maps:get(request, State, <<>>) + request => emqx_utils:redact(maps:get(request, State, <<>>)) }), ?tp(elasticsearch_bridge_started, #{instance_id => InstanceId}), {ok, State#{channels => #{}}}; @@ -208,7 +208,7 @@ on_start(InstanceId, Config) -> ?SLOG(error, #{ msg => "failed_to_start_elasticsearch_bridge", instance_id => InstanceId, - request => maps:get(request, Config, <<>>), + request => emqx_utils:redact(maps:get(request, Config, <<>>)), reason => Reason }), throw(failed_to_start_elasticsearch_bridge) @@ -354,6 +354,7 @@ add_query_string(Keys, Param0) -> end. to_str(List) when is_list(List) -> List; +to_str(Bin) when is_binary(Bin) -> binary_to_list(Bin); to_str(false) -> "false"; to_str(true) -> "true"; to_str(Atom) when is_atom(Atom) -> atom_to_list(Atom). diff --git a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl index d7b45c18d..ccf97f143 100644 --- a/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl +++ b/apps/emqx_bridge_iotdb/src/emqx_bridge_iotdb_connector.erl @@ -212,7 +212,7 @@ on_start(InstanceId, #{iotdb_version := Version} = Config) -> ?SLOG(info, #{ msg => "iotdb_bridge_started", instance_id => InstanceId, - request => maps:get(request, State, <<>>) + request => emqx_utils:redact(maps:get(request, State, <<>>)) }), ?tp(iotdb_bridge_started, #{instance_id => InstanceId}), {ok, State#{iotdb_version => Version, channels => #{}}}; @@ -220,7 +220,7 @@ on_start(InstanceId, #{iotdb_version := Version} = Config) -> ?SLOG(error, #{ msg => "failed_to_start_iotdb_bridge", instance_id => InstanceId, - request => maps:get(request, Config, <<>>), + request => emqx_utils:redact(maps:get(request, Config, <<>>)), reason => Reason }), throw(failed_to_start_iotdb_bridge)