Merge pull request #12348 from emqx/elasticsearch-e550

Elasticsearch e550
This commit is contained in:
zhongwencool 2024-01-18 18:06:48 +08:00 committed by GitHub
commit 1b432eadd8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
21 changed files with 722 additions and 33 deletions

View File

@ -16,4 +16,13 @@ HSTREAMDB_ZK_TAG=3.8.1
MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server
SQLSERVER_TAG=2019-CU19-ubuntu-20.04
# Password for the 'elastic' user (at least 6 characters)
ELASTIC_PASSWORD="emqx123"
# Password for the 'kibana_system' user (at least 6 characters)
KIBANA_PASSWORD="emqx123"
# Version of Elastic products
ELASTIC_TAG=8.11.4
LICENSE=basic
TARGET=emqx/emqx

View File

@ -0,0 +1,109 @@
version: "3.9"
services:
setup:
image: public.ecr.aws/elastic/elasticsearch:${ELASTIC_TAG}
volumes:
- ./elastic:/usr/share/elasticsearch/config/certs
user: "0"
command: >
bash -c '
if [ x${ELASTIC_PASSWORD} == x ]; then
echo "Set the ELASTIC_PASSWORD environment variable in the .env file";
exit 1;
elif [ x${KIBANA_PASSWORD} == x ]; then
echo "Set the KIBANA_PASSWORD environment variable in the .env file";
exit 1;
fi;
echo "Setting file permissions"
chown -R root:root config/certs;
find . -type d -exec chmod 750 \{\} \;;
find . -type f -exec chmod 640 \{\} \;;
echo "Waiting for Elasticsearch availability";
until curl -s --cacert config/certs/ca/ca.crt https://es01:9200 | grep -q "missing authentication credentials"; do sleep 30; done;
echo "Setting kibana_system password";
until curl -s -X POST --cacert config/certs/ca/ca.crt -u "elastic:${ELASTIC_PASSWORD}" -H "Content-Type: application/json" https://es01:9200/_security/user/kibana_system/_password -d "{\"password\":\"${KIBANA_PASSWORD}\"}" | grep -q "^{}"; do sleep 10; done;
echo "All done!";
'
healthcheck:
test: ["CMD-SHELL", "[ -f config/certs/ca/ca.crt ]"]
interval: 1s
timeout: 5s
retries: 120
es01:
depends_on:
setup:
condition: service_healthy
image: public.ecr.aws/elastic/elasticsearch:${ELASTIC_TAG}
container_name: elasticsearch
hostname: elasticsearch
volumes:
- ./elastic:/usr/share/elasticsearch/config/certs
- esdata01:/usr/share/elasticsearch/data
ports:
- 9200:9200
environment:
- node.name=es01
- ELASTIC_PASSWORD=${ELASTIC_PASSWORD}
- bootstrap.memory_lock=true
- discovery.type=single-node
- xpack.security.enabled=true
- xpack.security.http.ssl.enabled=true
- xpack.security.http.ssl.key=certs/es01/es01.key
- xpack.security.http.ssl.certificate=certs/es01/es01.crt
- xpack.security.http.ssl.certificate_authorities=certs/ca/ca.crt
- xpack.license.self_generated.type=${LICENSE}
mem_limit: 1073741824
ulimits:
memlock:
soft: -1
hard: -1
healthcheck:
test:
[
"CMD-SHELL",
"curl -s --cacert config/certs/ca/ca.crt https://localhost:9200 | grep -q 'missing authentication credentials'",
]
interval: 10s
timeout: 10s
retries: 120
restart: always
networks:
- emqx_bridge
kibana:
depends_on:
es01:
condition: service_healthy
image: public.ecr.aws/elastic/kibana:${ELASTIC_TAG}
volumes:
- ./elastic:/usr/share/kibana/config/certs
- kibanadata:/usr/share/kibana/data
ports:
- 5601:5601
environment:
- SERVERNAME=kibana
- ELASTICSEARCH_HOSTS=https://es01:9200
- ELASTICSEARCH_USERNAME=kibana_system
- ELASTICSEARCH_PASSWORD=${KIBANA_PASSWORD}
- ELASTICSEARCH_SSL_CERTIFICATEAUTHORITIES=config/certs/ca/ca.crt
mem_limit: 1073741824
healthcheck:
test:
[
"CMD-SHELL",
"curl -s -I http://localhost:5601 | grep -q 'HTTP/1.1 302 Found'",
]
interval: 10s
timeout: 10s
retries: 120
restart: always
networks:
- emqx_bridge
volumes:
esdata01:
driver: local
kibanadata:
driver: local

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDSjCCAjKgAwIBAgIVAIrN275DCtGnotTPpxwvQ5751N4OMA0GCSqGSIb3DQEB
CwUAMDQxMjAwBgNVBAMTKUVsYXN0aWMgQ2VydGlmaWNhdGUgVG9vbCBBdXRvZ2Vu
ZXJhdGVkIENBMB4XDTI0MDExNjAyMzIyMFoXDTI3MDExNTAyMzIyMFowNDEyMDAG
A1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5lcmF0ZWQgQ0Ew
ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCy0nwiEurUkIPFMLV1weVM
pPk/AlwZUzqjkeL44gsY53XI9Q05w/sL9u6PzwrXgTCFWNXzI9+MoAtp8phPkn14
cmg5/3sLe9YcFVFjYK/MoljlUbPDj+4dgk8l+w5FRSi0+JN5krUm7rYk9lojAkeS
fX8RU7ekKGbjBXIFtPxX5GNadu9RidR5GkHM3XroAIoris8bFOzMgFn9iybYnkhq
0S+Hpv0A8FVxzle0KNbPpsIkxXH2DnP2iPTDym9xJNl9Iv9MPtj9XaamH7TmXcSt
MbjkAudKsCw4bRuhHonM16DIUr8sX5UcRcAWyJ1x1qpZaOzMdh2VdYAHNuOsZwzJ
AgMBAAGjUzBRMB0GA1UdDgQWBBTAyDlp8NZfPe8NCGVlHJSVclGOhTAfBgNVHSME
GDAWgBTAyDlp8NZfPe8NCGVlHJSVclGOhTAPBgNVHRMBAf8EBTADAQH/MA0GCSqG
SIb3DQEBCwUAA4IBAQAeIUXRKmC53iirY4P49YspLafspAMf4ndMFQAp+Oc223Vs
hQC4axNoYnUdzWDH6LioAN7P826xNPqtXvTZF9fmeX7K8Nm9Kdj+for+QQI3j6+X
zq98VVkACb8b/Mc9Nac/WBbv/1IKyKgNNta7//WNPgAFolOfti/C0NLsPcKhrM9L
mGbvRX8ZjH8pVJ0YTy4/xfDcF7G/Lxl4Yvb0ZXpuQbvE1+Y0h5aoTNshT/skJxC4
iyVseYr21s3pptKcr6H9KZuSdZe5pbEo+81nT15w+50aswFLk9GCYh5UsQ+1jkRK
cKgxP93i6x8BVbQJGKi1A1jhauSKX2IpWZQsHy4p
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAstJ8IhLq1JCDxTC1dcHlTKT5PwJcGVM6o5Hi+OILGOd1yPUN
OcP7C/buj88K14EwhVjV8yPfjKALafKYT5J9eHJoOf97C3vWHBVRY2CvzKJY5VGz
w4/uHYJPJfsORUUotPiTeZK1Ju62JPZaIwJHkn1/EVO3pChm4wVyBbT8V+RjWnbv
UYnUeRpBzN166ACKK4rPGxTszIBZ/Ysm2J5IatEvh6b9APBVcc5XtCjWz6bCJMVx
9g5z9oj0w8pvcSTZfSL/TD7Y/V2mph+05l3ErTG45ALnSrAsOG0boR6JzNegyFK/
LF+VHEXAFsidcdaqWWjszHYdlXWABzbjrGcMyQIDAQABAoIBAAZOLXYanmjpIRpX
h7h7oikYEplWDRcQBBvvKZaOyuchhznTKTiZmF0xQ3Ny8J4Ndj9ndODWSZxI6uod
FaGNp+qytwnfgDBVGSVDm6tyRfSkX1fTsA/j3/iupvmO/w9yezdZYgLaCVTyex31
yVMdchZgYjYDUpEBYzJbV2xL18+GBRmmPjdXumlpcJqcclxjOQJSu/1WCGVfn/e/
64NQpAm7NSKLqeUl32g0/DvUpmYRfmf7ZjVUjePaJQU6sw5/N+3V9F1hYs8VSWz0
OMzYIfUcvixw+VWx5bu0nWt98FirhsQPjCTThD+DHP6koXGrdXpeMOQE1YZmoV5T
vP0X+FECgYEA5dsKVDQFL67muqz3CNRVM0xDWACCoa8789hYoxvhd1iO3e4kwXBa
ABPcZckioq+HiQ4UIxC2AhQ1FuTeIUTq7LZ0HtAAdKFi48U4LzmPhNUpG1E/HbJ3
GQbi4u1cAzGYuhdywktgBhn9bJ4XB7+X3815Y9qKkuRcwtXgKGDy8HkCgYEAxyly
vc7NBkLfIAmkOsm6VXfvfBTEUBUGi6+k1rarTUxWFIgRuk4FHywwWUTdxWBKJz3n
HNNJb/g7CcufdhLTuWVHQtJDxYf2cJjoi+Kf7/i/Qs9Nyhokj5Mnh6KlZQOWXpZd
Gwn/O13NeDxt1TIVO2xp6zY4FhVEPvaHuxsMCtECgYA7/eR/P6iO3nZoCJbdXhXy
spftEw0FSCg8p53SzIcXUCzRrcM4HavP0181zb5VebzFP8Bvun/WoRGOLSPwyP0L
1T8Pf7huuGSIEERuxvY3dC8raxQvGxJMnOiA0/Ss/Lfg8hfIsEWashPb0pMuOYpZ
JlblgfejCSlQzOOZhlxB+QKBgQCKmizRLV9/0QAJAsy5YPR9UJdpCebJOKiyg806
5Ct5AvwRE9UKjAuCczU+mu+f0fApOSpi5CQCeYVUvtG90UJpjrM2LLCfgoyeNbv4
xgG6dqlcbHrdgK4bATUMbsOd9g4qy4gGLkHi5df9qkhhi5Y9Iajg2X3U2H4DN3yk
WSFbUQKBgQCLz333qWOuT3OBv+EYxHDQUS4YG+dReUos+v0iPJzu+spnfibBF5IC
RjHIhPsdN1byNB0naXOkkz4tUlLGXv6umFgDtQvy/2rxvxQmUGp/WY1VM2+164Xe
NEWdMEU6UckCoMO77kw8JosKhmXCYaSW5bWwnXuEpOj9WWpwjKtxlA==
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDQDCCAiigAwIBAgIUe90yOBN1KBxOEr2jro3epamZksIwDQYJKoZIhvcNAQEL
BQAwNDEyMDAGA1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5l
cmF0ZWQgQ0EwHhcNMjQwMTE2MDIzMjIyWhcNMjcwMTE1MDIzMjIyWjAPMQ0wCwYD
VQQDEwRlczAxMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxGEL71pV
j8qoUxEuL7qjRSeS1eHxeKhu2jqEZb7iA1o/7b/26QuYAkoYL+WuJNfYjg5F/O8W
VVuAYIlN6a/mC6wT2t3pX4YSrdp+i3gtAC/LX+8mAeqMQPD+4jitOwjOsYzbuFCb
nYl86dnFPl/+Pmj20mtZ+Wt7oIPD88j6+r5qgv59pHICxS7Cq304LDTRQbNoT8HO
4c9VGGGtWIdtrqiYrz1OVefkffMrvFt77v6dKHn8g5tSyfQUDCoEKtTOc3Pe5zCB
vIMs6HaapoSkl8XdpFHQ712PCZRebAMCrVcPYQ3r8e9GYmLY/NhxEn3dWTqRhHeg
UD13O8o1aBWonwIDAQABo28wbTAdBgNVHQ4EFgQUXvGJtSf2/mLOK17AzUridtCV
xWwwHwYDVR0jBBgwFoAUwMg5afDWXz3vDQhlZRyUlXJRjoUwIAYDVR0RBBkwF4IJ
bG9jYWxob3N0hwR/AAABggRlczAxMAkGA1UdEwQCMAAwDQYJKoZIhvcNAQELBQAD
ggEBACaNq3ZqrbsGvbEtrf6kJGIsTokTFHeVJUSYmt1ZZzDFLSepXAC/J8gphV45
B+YSlkDPNTwMYlf7TUYY872zkdqOXN9r0NUx8MzVAX0+rux0RJba5GGUvJGZDNMX
WM5z9ry1KjQSQ1bSoRQOD3QArmBmhvikHjLc97Vqt56N0wA/ztXWOpNZX/TXmast
aXlUbcfQE73Cdq9tW1ATXwbQ2Gf7vVAUT3zjZSZbNdgPuBicGJHf85Fhjm2ND4+R
sjLIOQ2YgVxNHYbueScc6lJM5RNK194K7WrEQnRyGHT3NaDUm0FFNl//aQeq1ZVw
6gaUYlkTFauXwEYMDK901cWFaBE=
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAxGEL71pVj8qoUxEuL7qjRSeS1eHxeKhu2jqEZb7iA1o/7b/2
6QuYAkoYL+WuJNfYjg5F/O8WVVuAYIlN6a/mC6wT2t3pX4YSrdp+i3gtAC/LX+8m
AeqMQPD+4jitOwjOsYzbuFCbnYl86dnFPl/+Pmj20mtZ+Wt7oIPD88j6+r5qgv59
pHICxS7Cq304LDTRQbNoT8HO4c9VGGGtWIdtrqiYrz1OVefkffMrvFt77v6dKHn8
g5tSyfQUDCoEKtTOc3Pe5zCBvIMs6HaapoSkl8XdpFHQ712PCZRebAMCrVcPYQ3r
8e9GYmLY/NhxEn3dWTqRhHegUD13O8o1aBWonwIDAQABAoIBADJ3A/Om4az5dcce
96EBU9q+IDBBh2Wr1wzSk9p3sqoM47fLqH5b4dzYwJ1yZw2FwFtFFLw6jqExyexE
7JY8gyAFwPZyJ3pKQHuX1gQuRlYxchB9quU8Kn230LA+w1mT2lXrLj2PzWWvAsAv
m837KiFMpP0O5EjB07u8kLsRr1mG6QQ24Kc8oxd7xLXIiPzSvsOpYwo9hmIWENd5
kyA7oSa9EmN3TRTkKOHI7cFQ3DqIGdO71waUofKOdx39DyHS2YKWxDE/LUjkS9zw
1AyZG09l4uowyLRqwYhivEq9Za6rdc64yheuHatAM9kC2AOcVcsCPZquIe90k4t1
L7e9CAECgYEA1W483xTW8ngzxv9MMuPiW+PwVGRpyQrbO6OZOxdWEYfhrZlk5wlW
XK2T85jqooJwMWPTk1F49vZ9WN2KuLkL65GlkEtkFbxmOiFJjXuWwycbFSk05hPs
4AESBYHieaSPcwYhvLeG6g4PFyeqmbAGnKsJaj2ylPwDBOc7LgVlqAECgYEA64wo
gZwaj5SlP8M/OqGH04UVYr1kP/Eq6eiDfMyV5exy+pyzofZyNKUfJfw6sGgyRRHx
OVxlnPMsZ8zbdOXsvUEIeavpwDfQcp5eAURL65I6GMLsx2QpfiN2mDe1MqQW0jct
UleFaURgS84KHLE0+tBBg906jOHGjsE7Q3lyUJ8CgYBYYPev4K9JZGD8bEcfY6Ie
Lvsb1yC+8VHrFkmjYHxxcfUPr89KpGEwq2fynUW72YufyBiajkgq69Ln84U4DNhU
ydDnOXDOV191fsc4YQ8C7LSYRKH1DBcwgwD1at1fRbdpCAb8YHrrfLre+bv5PBzg
zyps5fOHIfwWEbI90lpQAQKBgQDoMMqBMTtxi+r1lucOScrVtFuncOCQs5BE8cIj
1JxzAQk6iBv/LSvZP2gcDq5f1Oaw9YXfsHguJfwA+ozeiAQ9bw0Gu3N52sstIXWz
M/rO5d9FJ2k3CEJqqFSwqkGBAQXKBUA06jeF1DREpX+MVxbNo1rhvMOJusn7UPm1
gtMwKwKBgQCfRzFO10ITwrw8rcRZwO9Axgqf11V7xn6qpgRxj4h0HOErVTCN1H0b
vE3Pz7cxS/g9vFRP37TuqBLfGVzPt9LAEFwCWPeZJLROBLHyu8XrhTbQx+sI2/pe
SBEJAQAHtYasFTE0sBEKNEY2rIt1c29XZhyhhtNKD9gRN/gB355wLg==
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,7 @@
instances:
- name: es01
dns:
- es01
- localhost
ip:
- 127.0.0.1

View File

@ -191,5 +191,11 @@
"listen": "0.0.0.0:636",
"upstream": "ldap:636",
"enabled": true
},
{
"name": "elasticsearch",
"listen": "0.0.0.0:9200",
"upstream": "elasticsearch:9200",
"enabled": true
}
]

View File

@ -1 +1,2 @@
toxiproxy
elasticsearch

View File

@ -59,7 +59,7 @@ fields(action_create) ->
action(create),
index(),
id(false),
doc(true),
doc(),
routing(),
require_alias(),
overwrite()
@ -72,7 +72,8 @@ fields(action_update) ->
action(update),
index(),
id(true),
doc(true),
doc(),
doc_as_upsert(),
routing(),
require_alias()
| http_common_opts()
@ -98,10 +99,14 @@ action_union_member_selector({value, Value}) ->
[?R_REF(action_delete)];
#{<<"action">> := <<"update">>} ->
[?R_REF(action_update)];
_ ->
#{<<"action">> := Action} when is_atom(Action) ->
Value1 = Value#{<<"action">> => atom_to_binary(Action)},
action_union_member_selector({value, Value1});
Actual ->
Expected = "create | delete | update",
throw(#{
field_name => action,
actual => Actual,
expected => Expected
})
end.
@ -149,12 +154,12 @@ id(Required) ->
}
)}.
doc(Required) ->
doc() ->
{doc,
?HOCON(
binary(),
#{
required => Required,
required => false,
example => <<"${payload.doc}">>,
desc => ?DESC("config_parameters_doc")
}
@ -168,6 +173,17 @@ http_common_opts() ->
emqx_bridge_http_schema:fields("parameters_opts")
).
doc_as_upsert() ->
{doc_as_upsert,
?HOCON(
boolean(),
#{
required => false,
default => false,
desc => ?DESC("config_doc_as_upsert")
}
)}.
routing() ->
{routing,
?HOCON(

View File

@ -33,6 +33,8 @@
connector_example_values/0
]).
-export([render_template/2]).
%% emqx_connector_resource behaviour callbacks
-export([connector_config/2]).
@ -200,7 +202,7 @@ on_start(InstanceId, Config) ->
?SLOG(info, #{
msg => "elasticsearch_bridge_started",
instance_id => InstanceId,
request => maps:get(request, State, <<>>)
request => emqx_utils:redact(maps:get(request, State, <<>>))
}),
?tp(elasticsearch_bridge_started, #{instance_id => InstanceId}),
{ok, State#{channels => #{}}};
@ -208,7 +210,7 @@ on_start(InstanceId, Config) ->
?SLOG(error, #{
msg => "failed_to_start_elasticsearch_bridge",
instance_id => InstanceId,
request => maps:get(request, Config, <<>>),
request => emqx_utils:redact(maps:get(request, Config, <<>>)),
reason => Reason
}),
throw(failed_to_start_elasticsearch_bridge)
@ -286,8 +288,12 @@ on_add_channel(
method => method(Parameter),
body => get_body_template(Parameter)
},
ChannelConfig = #{
parameters => Parameter1,
render_template_func => fun ?MODULE:render_template/2
},
{ok, State} = emqx_bridge_http_connector:on_add_channel(
InstanceId, State0, ChannelId, #{parameters => Parameter1}
InstanceId, State0, ChannelId, ChannelConfig
),
Channel = Parameter1,
Channels2 = Channels#{ChannelId => Channel},
@ -310,9 +316,23 @@ on_get_channel_status(_InstanceId, ChannelId, #{channels := Channels}) ->
{error, not_exists}
end.
render_template(Template, Msg) ->
% Ignoring errors here, undefined bindings will be replaced with empty string.
Opts = #{var_trans => fun to_string/2},
{String, _Errors} = emqx_template:render(Template, {emqx_jsonish, Msg}, Opts),
String.
%%--------------------------------------------------------------------
%% Internal Functions
%%--------------------------------------------------------------------
to_string(Name, Value) ->
emqx_template:to_string(render_var(Name, Value)).
render_var(_, undefined) ->
% NOTE Any allowed but undefined binding will be replaced with empty string
<<>>;
render_var(_Name, Value) ->
Value.
%% delete DELETE /<index>/_doc/<_id>
path(#{action := delete, id := Id, index := Index} = Action) ->
BasePath = ["/", Index, "/_doc/", Id],
@ -354,6 +374,7 @@ add_query_string(Keys, Param0) ->
end.
to_str(List) when is_list(List) -> List;
to_str(Bin) when is_binary(Bin) -> binary_to_list(Bin);
to_str(false) -> "false";
to_str(true) -> "true";
to_str(Atom) when is_atom(Atom) -> atom_to_list(Atom).
@ -369,5 +390,12 @@ handle_response({ok, Code, Body}) ->
handle_response({error, _} = Error) ->
Error.
get_body_template(#{doc := Doc}) -> Doc;
get_body_template(_) -> undefined.
get_body_template(#{action := update, doc := Doc} = Template) ->
case maps:get(doc_as_upsert, Template, false) of
false -> <<"{\"doc\":", Doc/binary, "}">>;
true -> <<"{\"doc\":", Doc/binary, ",\"doc_as_upsert\": true}">>
end;
get_body_template(#{doc := Doc}) ->
Doc;
get_body_template(_) ->
undefined.

View File

@ -0,0 +1,379 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_es_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
-define(TYPE, elasticsearch).
-define(CA, "es.crt").
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ProxyName = "elasticsearch",
ESHost = os:getenv("ELASTICSEARCH_HOST", "elasticsearch"),
ESPort = list_to_integer(os:getenv("ELASTICSEARCH_PORT", "9200")),
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge_es,
emqx_bridge,
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
wait_until_elasticsearch_is_up(ESHost, ESPort),
[
{apps, Apps},
{proxy_name, ProxyName},
{es_host, ESHost},
{es_port, ESPort}
| Config
].
es_checks() ->
case os:getenv("IS_CI") of
"yes" -> 10;
_ -> 1
end.
wait_until_elasticsearch_is_up(Host, Port) ->
wait_until_elasticsearch_is_up(es_checks(), Host, Port).
wait_until_elasticsearch_is_up(0, Host, Port) ->
throw({{Host, Port}, not_available});
wait_until_elasticsearch_is_up(Count, Host, Port) ->
timer:sleep(1000),
case emqx_common_test_helpers:is_all_tcp_servers_available([{Host, Port}]) of
true -> ok;
false -> wait_until_elasticsearch_is_up(Count - 1, Host, Port)
end.
end_per_suite(Config) ->
Apps = ?config(apps, Config),
%ProxyHost = ?config(proxy_host, Config),
%ProxyPort = ?config(proxy_port, Config),
%emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, _Config) ->
%ProxyHost = ?config(proxy_host, Config),
%ProxyPort = ?config(proxy_port, Config),
%emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_common_test_helpers:call_janitor(60_000),
ok.
%%-------------------------------------------------------------------------------------
%% Helper fns
%%-------------------------------------------------------------------------------------
check_send_message_with_action(Topic, ActionName, ConnectorName) ->
send_message(Topic),
%% ######################################
%% Check if message is sent to es
%% ######################################
timer:sleep(500),
check_action_metrics(ActionName, ConnectorName).
send_message(Topic) ->
Now = emqx_utils_calendar:now_to_rfc3339(microsecond),
Doc = #{<<"name">> => <<"emqx">>, <<"release_date">> => Now},
Index = <<"emqx-test-index">>,
Payload = emqx_utils_json:encode(#{doc => Doc, index => Index}),
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
{ok, Client} = emqtt:start_link([{clientid, ClientId}, {port, 1883}]),
{ok, _} = emqtt:connect(Client),
ok = emqtt:publish(Client, Topic, Payload, [{qos, 0}]),
ok.
check_action_metrics(ActionName, ConnectorName) ->
ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName),
Metrics =
#{
match => emqx_resource_metrics:matched_get(ActionId),
success => emqx_resource_metrics:success_get(ActionId),
failed => emqx_resource_metrics:failed_get(ActionId),
queuing => emqx_resource_metrics:queuing_get(ActionId),
dropped => emqx_resource_metrics:dropped_get(ActionId)
},
?assertEqual(
#{
match => 1,
success => 1,
dropped => 0,
failed => 0,
queuing => 0
},
Metrics,
{ActionName, ConnectorName, ActionId}
).
action_config(ConnectorName) ->
action_config(ConnectorName, _Overrides = #{}).
action_config(ConnectorName, Overrides) ->
Cfg0 = action(ConnectorName),
emqx_utils_maps:deep_merge(Cfg0, Overrides).
action(ConnectorName) ->
#{
<<"description">> => <<"My elasticsearch test action">>,
<<"enable">> => true,
<<"parameters">> => #{
<<"index">> => <<"${payload.index}">>,
<<"action">> => <<"create">>,
<<"doc">> => <<"${payload.doc}">>,
<<"overwrite">> => true
},
<<"connector">> => ConnectorName,
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"30s">>,
<<"query_mode">> => <<"sync">>
}
}.
base_url(Config) ->
Host = ?config(es_host, Config),
Port = ?config(es_port, Config),
iolist_to_binary([
"https://",
Host,
":",
integer_to_binary(Port)
]).
connector_config(Config) ->
connector_config(_Overrides = #{}, Config).
connector_config(Overrides, Config) ->
Defaults =
#{
<<"base_url">> => base_url(Config),
<<"enable">> => true,
<<"authentication">> => #{
<<"password">> => <<"emqx123">>,
<<"username">> => <<"elastic">>
},
<<"description">> => <<"My elasticsearch test connector">>,
<<"connect_timeout">> => <<"15s">>,
<<"pool_size">> => 2,
<<"pool_type">> => <<"random">>,
<<"enable_pipelining">> => 100,
<<"ssl">> => #{
<<"enable">> => true,
<<"hibernate_after">> => <<"5s">>,
<<"cacertfile">> => filename:join(?config(data_dir, Config), ?CA)
}
},
emqx_utils_maps:deep_merge(Defaults, Overrides).
create_connector(Name, Config) ->
Res = emqx_connector:create(?TYPE, Name, Config),
on_exit(fun() -> emqx_connector:remove(?TYPE, Name) end),
Res.
create_action(Name, Config) ->
Res = emqx_bridge_v2:create(?TYPE, Name, Config),
on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end),
Res.
action_api_spec_props_for_get() ->
#{
<<"bridge_elasticsearch.get_bridge_v2">> :=
#{<<"properties">> := Props}
} =
emqx_bridge_v2_testlib:actions_api_spec_schemas(),
Props.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_create_remove_list(Config) ->
[] = emqx_bridge_v2:list(),
ConnectorConfig = connector_config(Config),
{ok, _} = emqx_connector:create(?TYPE, test_connector, ConnectorConfig),
ActionConfig = action(<<"test_connector">>),
{ok, _} = emqx_bridge_v2:create(?TYPE, test_action_1, ActionConfig),
[ActionInfo] = emqx_bridge_v2:list(),
#{
name := <<"test_action_1">>,
type := <<"elasticsearch">>,
raw_config := _,
status := connected
} = ActionInfo,
{ok, _} = emqx_bridge_v2:create(?TYPE, test_action_2, ActionConfig),
2 = length(emqx_bridge_v2:list()),
ok = emqx_bridge_v2:remove(?TYPE, test_action_1),
1 = length(emqx_bridge_v2:list()),
ok = emqx_bridge_v2:remove(?TYPE, test_action_2),
[] = emqx_bridge_v2:list(),
emqx_connector:remove(?TYPE, test_connector),
ok.
%% Test sending a message to a bridge V2
t_send_message(Config) ->
ConnectorConfig = connector_config(Config),
{ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig),
ActionConfig = action(<<"test_connector2">>),
{ok, _} = emqx_bridge_v2:create(?TYPE, test_action_1, ActionConfig),
Rule = #{
id => <<"rule:t_es">>,
sql => <<"SELECT\n *\nFROM\n \"es/#\"">>,
actions => [<<"elasticsearch:test_action_1">>],
description => <<"sink doc to elasticsearch">>
},
{ok, _} = emqx_rule_engine:create_rule(Rule),
%% Use the action to send a message
check_send_message_with_action(<<"es/1">>, test_action_1, test_connector2),
%% Create a few more bridges with the same connector and test them
ActionNames1 =
lists:foldl(
fun(I, Acc) ->
Seq = integer_to_binary(I),
ActionNameStr = "test_action_" ++ integer_to_list(I),
ActionName = list_to_atom(ActionNameStr),
{ok, _} = emqx_bridge_v2:create(?TYPE, ActionName, ActionConfig),
Rule1 = #{
id => <<"rule:t_es", Seq/binary>>,
sql => <<"SELECT\n *\nFROM\n \"es/", Seq/binary, "\"">>,
actions => [<<"elasticsearch:", (list_to_binary(ActionNameStr))/binary>>],
description => <<"sink doc to elasticsearch">>
},
{ok, _} = emqx_rule_engine:create_rule(Rule1),
Topic = <<"es/", Seq/binary>>,
check_send_message_with_action(Topic, ActionName, test_connector2),
[ActionName | Acc]
end,
[],
lists:seq(2, 10)
),
ActionNames = [test_action_1 | ActionNames1],
%% Remove all the bridges
lists:foreach(
fun(BridgeName) ->
ok = emqx_bridge_v2:remove(?TYPE, BridgeName)
end,
ActionNames
),
emqx_connector:remove(?TYPE, test_connector2),
ok.
%% Test that we can get the status of the bridge V2
t_health_check(Config) ->
BridgeV2Config = action(<<"test_connector3">>),
ConnectorConfig = connector_config(Config),
{ok, _} = emqx_connector:create(?TYPE, test_connector3, ConnectorConfig),
{ok, _} = emqx_bridge_v2:create(?TYPE, test_bridge_v2, BridgeV2Config),
#{status := connected} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2),
ok = emqx_bridge_v2:remove(?TYPE, test_bridge_v2),
%% Check behaviour when bridge does not exist
{error, bridge_not_found} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2),
ok = emqx_connector:remove(?TYPE, test_connector3),
ok.
t_bad_url(Config) ->
ConnectorName = <<"test_connector">>,
ActionName = <<"test_action">>,
ActionConfig = action(<<"test_connector">>),
ConnectorConfig0 = connector_config(Config),
ConnectorConfig = ConnectorConfig0#{<<"base_url">> := <<"bad_host:9092">>},
?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)),
?assertMatch({ok, _}, create_action(ActionName, ActionConfig)),
?assertMatch(
{ok, #{
resource_data :=
#{
status := ?status_disconnected,
error := failed_to_start_elasticsearch_bridge
}
}},
emqx_connector:lookup(?TYPE, ConnectorName)
),
?assertMatch({ok, #{status := ?status_disconnected}}, emqx_bridge_v2:lookup(?TYPE, ActionName)),
ok.
t_parameters_key_api_spec(_Config) ->
ActionProps = action_api_spec_props_for_get(),
?assertNot(is_map_key(<<"elasticsearch">>, ActionProps), #{action_props => ActionProps}),
?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}),
ok.
t_http_api_get(Config) ->
ConnectorName = <<"test_connector">>,
ActionName = <<"test_action">>,
ActionConfig = action(ConnectorName),
ConnectorConfig = connector_config(Config),
?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)),
?assertMatch({ok, _}, create_action(ActionName, ActionConfig)),
?assertMatch(
{ok,
{{_, 200, _}, _, [
#{
<<"connector">> := ConnectorName,
<<"description">> := <<"My elasticsearch test action">>,
<<"enable">> := true,
<<"error">> := <<>>,
<<"name">> := ActionName,
<<"node_status">> :=
[
#{
<<"node">> := _,
<<"status">> := <<"connected">>,
<<"status_reason">> := <<>>
}
],
<<"parameters">> :=
#{
<<"action">> := <<"create">>,
<<"doc">> := <<"${payload.doc}">>,
<<"index">> := <<"${payload.index}">>,
<<"max_retries">> := 2,
<<"overwrite">> := true
},
<<"resource_opts">> := #{<<"query_mode">> := <<"sync">>},
<<"status">> := <<"connected">>,
<<"status_reason">> := <<>>,
<<"type">> := <<"elasticsearch">>
}
]}},
emqx_bridge_v2_testlib:list_bridges_api()
),
ok.

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDSjCCAjKgAwIBAgIVAIrN275DCtGnotTPpxwvQ5751N4OMA0GCSqGSIb3DQEB
CwUAMDQxMjAwBgNVBAMTKUVsYXN0aWMgQ2VydGlmaWNhdGUgVG9vbCBBdXRvZ2Vu
ZXJhdGVkIENBMB4XDTI0MDExNjAyMzIyMFoXDTI3MDExNTAyMzIyMFowNDEyMDAG
A1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5lcmF0ZWQgQ0Ew
ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCy0nwiEurUkIPFMLV1weVM
pPk/AlwZUzqjkeL44gsY53XI9Q05w/sL9u6PzwrXgTCFWNXzI9+MoAtp8phPkn14
cmg5/3sLe9YcFVFjYK/MoljlUbPDj+4dgk8l+w5FRSi0+JN5krUm7rYk9lojAkeS
fX8RU7ekKGbjBXIFtPxX5GNadu9RidR5GkHM3XroAIoris8bFOzMgFn9iybYnkhq
0S+Hpv0A8FVxzle0KNbPpsIkxXH2DnP2iPTDym9xJNl9Iv9MPtj9XaamH7TmXcSt
MbjkAudKsCw4bRuhHonM16DIUr8sX5UcRcAWyJ1x1qpZaOzMdh2VdYAHNuOsZwzJ
AgMBAAGjUzBRMB0GA1UdDgQWBBTAyDlp8NZfPe8NCGVlHJSVclGOhTAfBgNVHSME
GDAWgBTAyDlp8NZfPe8NCGVlHJSVclGOhTAPBgNVHRMBAf8EBTADAQH/MA0GCSqG
SIb3DQEBCwUAA4IBAQAeIUXRKmC53iirY4P49YspLafspAMf4ndMFQAp+Oc223Vs
hQC4axNoYnUdzWDH6LioAN7P826xNPqtXvTZF9fmeX7K8Nm9Kdj+for+QQI3j6+X
zq98VVkACb8b/Mc9Nac/WBbv/1IKyKgNNta7//WNPgAFolOfti/C0NLsPcKhrM9L
mGbvRX8ZjH8pVJ0YTy4/xfDcF7G/Lxl4Yvb0ZXpuQbvE1+Y0h5aoTNshT/skJxC4
iyVseYr21s3pptKcr6H9KZuSdZe5pbEo+81nT15w+50aswFLk9GCYh5UsQ+1jkRK
cKgxP93i6x8BVbQJGKi1A1jhauSKX2IpWZQsHy4p
-----END CERTIFICATE-----

View File

@ -38,6 +38,7 @@
]).
-export([reply_delegator/3]).
-export([render_template/2]).
-export([
roots/0,
@ -266,7 +267,9 @@ on_add_channel(
) ->
InstalledActions = maps:get(installed_actions, OldState, #{}),
{ok, ActionState} = do_create_http_action(ActionConfig),
NewInstalledActions = maps:put(ActionId, ActionState, InstalledActions),
RenderTmplFunc = maps:get(render_template_func, ActionConfig, fun ?MODULE:render_template/2),
ActionState1 = ActionState#{render_template_func => RenderTmplFunc},
NewInstalledActions = maps:put(ActionId, ActionState1, InstalledActions),
NewState = maps:put(installed_actions, NewInstalledActions, OldState),
{ok, NewState}.
@ -631,9 +634,10 @@ parse_template(String) ->
process_request_and_action(Request, ActionState, Msg) ->
MethodTemplate = maps:get(method, ActionState),
Method = make_method(render_template_string(MethodTemplate, Msg)),
PathPrefix = unicode:characters_to_list(render_template(maps:get(path, Request), Msg)),
PathSuffix = unicode:characters_to_list(render_template(maps:get(path, ActionState), Msg)),
RenderTmplFunc = maps:get(render_template_func, ActionState),
Method = make_method(render_template_string(MethodTemplate, RenderTmplFunc, Msg)),
PathPrefix = unicode:characters_to_list(RenderTmplFunc(maps:get(path, Request), Msg)),
PathSuffix = unicode:characters_to_list(RenderTmplFunc(maps:get(path, ActionState), Msg)),
Path =
case PathSuffix of
@ -644,11 +648,11 @@ process_request_and_action(Request, ActionState, Msg) ->
HeadersTemplate1 = maps:get(headers, Request),
HeadersTemplate2 = maps:get(headers, ActionState),
Headers = merge_proplist(
render_headers(HeadersTemplate1, Msg),
render_headers(HeadersTemplate2, Msg)
render_headers(HeadersTemplate1, RenderTmplFunc, Msg),
render_headers(HeadersTemplate2, RenderTmplFunc, Msg)
),
BodyTemplate = maps:get(body, ActionState),
Body = render_request_body(BodyTemplate, Msg),
Body = render_request_body(BodyTemplate, RenderTmplFunc, Msg),
#{
method => Method,
path => Path,
@ -681,25 +685,26 @@ process_request(
} = Conf,
Msg
) ->
RenderTemplateFun = fun render_template/2,
Conf#{
method => make_method(render_template_string(MethodTemplate, Msg)),
path => unicode:characters_to_list(render_template(PathTemplate, Msg)),
body => render_request_body(BodyTemplate, Msg),
headers => render_headers(HeadersTemplate, Msg),
method => make_method(render_template_string(MethodTemplate, RenderTemplateFun, Msg)),
path => unicode:characters_to_list(RenderTemplateFun(PathTemplate, Msg)),
body => render_request_body(BodyTemplate, RenderTemplateFun, Msg),
headers => render_headers(HeadersTemplate, RenderTemplateFun, Msg),
request_timeout => ReqTimeout
}.
render_request_body(undefined, Msg) ->
render_request_body(undefined, _, Msg) ->
emqx_utils_json:encode(Msg);
render_request_body(BodyTks, Msg) ->
render_template(BodyTks, Msg).
render_request_body(BodyTks, RenderTmplFunc, Msg) ->
RenderTmplFunc(BodyTks, Msg).
render_headers(HeaderTks, Msg) ->
render_headers(HeaderTks, RenderTmplFunc, Msg) ->
lists:map(
fun({K, V}) ->
{
render_template_string(K, Msg),
render_template_string(emqx_secret:unwrap(V), Msg)
render_template_string(K, RenderTmplFunc, Msg),
render_template_string(emqx_secret:unwrap(V), RenderTmplFunc, Msg)
}
end,
HeaderTks
@ -710,8 +715,8 @@ render_template(Template, Msg) ->
{String, _Errors} = emqx_template:render(Template, {emqx_jsonish, Msg}),
String.
render_template_string(Template, Msg) ->
unicode:characters_to_binary(render_template(Template, Msg)).
render_template_string(Template, RenderTmplFunc, Msg) ->
unicode:characters_to_binary(RenderTmplFunc(Template, Msg)).
make_method(M) when M == <<"POST">>; M == <<"post">> -> post;
make_method(M) when M == <<"PUT">>; M == <<"put">> -> put;

View File

@ -146,7 +146,7 @@ end_per_testcase(_TestCase, Config) ->
%%------------------------------------------------------------------------------
%% HTTP server for testing
%% (Orginally copied from emqx_bridge_api_SUITE)
%% (Originally copied from emqx_bridge_api_SUITE)
%%------------------------------------------------------------------------------
start_http_server(HTTPServerConfig) ->
process_flag(trap_exit, true),

View File

@ -212,7 +212,7 @@ on_start(InstanceId, #{iotdb_version := Version} = Config) ->
?SLOG(info, #{
msg => "iotdb_bridge_started",
instance_id => InstanceId,
request => maps:get(request, State, <<>>)
request => emqx_utils:redact(maps:get(request, State, <<>>))
}),
?tp(iotdb_bridge_started, #{instance_id => InstanceId}),
{ok, State#{iotdb_version => Version, channels => #{}}};
@ -220,7 +220,7 @@ on_start(InstanceId, #{iotdb_version := Version} = Config) ->
?SLOG(error, #{
msg => "failed_to_start_iotdb_bridge",
instance_id => InstanceId,
request => maps:get(request, Config, <<>>),
request => emqx_utils:redact(maps:get(request, Config, <<>>)),
reason => Reason
}),
throw(failed_to_start_iotdb_bridge)

View File

@ -150,6 +150,9 @@ post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
ok = emqx_connector_resource:remove(Type, Name),
?tp(connector_post_config_update_done, #{}),
ok;
{error, not_found} ->
?tp(connector_post_config_update_done, #{}),
ok;
{ok, Channels} ->
{error, {active_channels, Channels}}
end;

1
changes/feat-12348.en.md Normal file
View File

@ -0,0 +1 @@
Added support for Elasticsearch Bridge.

View File

@ -56,6 +56,12 @@ config_routing.desc:
config_routing.label:
"""Routing"""
config_doc_as_upsert.desc:
"""Instead of sending a partial doc plus an upsert doc,
you can set doc_as_upsert to true to use the contents of doc as the upsert value."""
config_doc_as_upsert.label:
"""doc_as_upsert"""
config_wait_for_active_shards.desc:
"""The number of shard copies that must be active before proceeding with the operation.
Set to all or any positive integer up to the total number of shards in the index (number_of_replicas+1).
@ -97,7 +103,7 @@ config_parameters_require_alias.label:
"""_require_alias"""
config_parameters_doc.desc:
"""JSON document"""
"""JSON document. If undefined, rule engine will use JSON format to serialize all visible inputs, such as clientid, topic, payload etc."""
config_parameters_doc.label:
"""doc"""

View File

@ -246,6 +246,9 @@ for dep in ${CT_DEPS}; do
otel)
FILES+=( '.ci/docker-compose-file/docker-compose-otel.yaml' )
;;
elasticsearch)
FILES+=( '.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml' )
;;
*)
echo "unknown_ct_dependency $dep"
exit 1

View File

@ -299,3 +299,5 @@ now_us
ns
elasticsearch
ElasticSearch
doc_as_upsert
upsert