test: add es docker CI test

This commit is contained in:
zhongwencool 2024-01-18 08:56:39 +08:00
parent 7f5fe91905
commit 91368a57ff
15 changed files with 528 additions and 5 deletions

View File

@ -16,4 +16,13 @@ HSTREAMDB_ZK_TAG=3.8.1
MS_IMAGE_ADDR=mcr.microsoft.com/mssql/server
SQLSERVER_TAG=2019-CU19-ubuntu-20.04
# Password for the 'elastic' user (at least 6 characters)
ELASTIC_PASSWORD="emqx123"
# Password for the 'kibana_system' user (at least 6 characters)
KIBANA_PASSWORD="emqx123"
# Version of Elastic products
ELASTIC_TAG=8.11.4
LICENSE=basic
TARGET=emqx/emqx

View File

@ -36,6 +36,8 @@ services:
setup:
condition: service_healthy
image: public.ecr.aws/elastic/elasticsearch:${ELASTIC_TAG}
container_name: elasticsearch
hostname: elasticsearch
volumes:
- ./elastic:/usr/share/elasticsearch/config/certs
- esdata01:/usr/share/elasticsearch/data
@ -66,6 +68,9 @@ services:
interval: 10s
timeout: 10s
retries: 120
restart: always
networks:
- emqx_bridge
kibana:
depends_on:
@ -93,6 +98,9 @@ services:
interval: 10s
timeout: 10s
retries: 120
restart: always
networks:
- emqx_bridge
volumes:
esdata01:

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDSjCCAjKgAwIBAgIVAIrN275DCtGnotTPpxwvQ5751N4OMA0GCSqGSIb3DQEB
CwUAMDQxMjAwBgNVBAMTKUVsYXN0aWMgQ2VydGlmaWNhdGUgVG9vbCBBdXRvZ2Vu
ZXJhdGVkIENBMB4XDTI0MDExNjAyMzIyMFoXDTI3MDExNTAyMzIyMFowNDEyMDAG
A1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5lcmF0ZWQgQ0Ew
ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCy0nwiEurUkIPFMLV1weVM
pPk/AlwZUzqjkeL44gsY53XI9Q05w/sL9u6PzwrXgTCFWNXzI9+MoAtp8phPkn14
cmg5/3sLe9YcFVFjYK/MoljlUbPDj+4dgk8l+w5FRSi0+JN5krUm7rYk9lojAkeS
fX8RU7ekKGbjBXIFtPxX5GNadu9RidR5GkHM3XroAIoris8bFOzMgFn9iybYnkhq
0S+Hpv0A8FVxzle0KNbPpsIkxXH2DnP2iPTDym9xJNl9Iv9MPtj9XaamH7TmXcSt
MbjkAudKsCw4bRuhHonM16DIUr8sX5UcRcAWyJ1x1qpZaOzMdh2VdYAHNuOsZwzJ
AgMBAAGjUzBRMB0GA1UdDgQWBBTAyDlp8NZfPe8NCGVlHJSVclGOhTAfBgNVHSME
GDAWgBTAyDlp8NZfPe8NCGVlHJSVclGOhTAPBgNVHRMBAf8EBTADAQH/MA0GCSqG
SIb3DQEBCwUAA4IBAQAeIUXRKmC53iirY4P49YspLafspAMf4ndMFQAp+Oc223Vs
hQC4axNoYnUdzWDH6LioAN7P826xNPqtXvTZF9fmeX7K8Nm9Kdj+for+QQI3j6+X
zq98VVkACb8b/Mc9Nac/WBbv/1IKyKgNNta7//WNPgAFolOfti/C0NLsPcKhrM9L
mGbvRX8ZjH8pVJ0YTy4/xfDcF7G/Lxl4Yvb0ZXpuQbvE1+Y0h5aoTNshT/skJxC4
iyVseYr21s3pptKcr6H9KZuSdZe5pbEo+81nT15w+50aswFLk9GCYh5UsQ+1jkRK
cKgxP93i6x8BVbQJGKi1A1jhauSKX2IpWZQsHy4p
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAstJ8IhLq1JCDxTC1dcHlTKT5PwJcGVM6o5Hi+OILGOd1yPUN
OcP7C/buj88K14EwhVjV8yPfjKALafKYT5J9eHJoOf97C3vWHBVRY2CvzKJY5VGz
w4/uHYJPJfsORUUotPiTeZK1Ju62JPZaIwJHkn1/EVO3pChm4wVyBbT8V+RjWnbv
UYnUeRpBzN166ACKK4rPGxTszIBZ/Ysm2J5IatEvh6b9APBVcc5XtCjWz6bCJMVx
9g5z9oj0w8pvcSTZfSL/TD7Y/V2mph+05l3ErTG45ALnSrAsOG0boR6JzNegyFK/
LF+VHEXAFsidcdaqWWjszHYdlXWABzbjrGcMyQIDAQABAoIBAAZOLXYanmjpIRpX
h7h7oikYEplWDRcQBBvvKZaOyuchhznTKTiZmF0xQ3Ny8J4Ndj9ndODWSZxI6uod
FaGNp+qytwnfgDBVGSVDm6tyRfSkX1fTsA/j3/iupvmO/w9yezdZYgLaCVTyex31
yVMdchZgYjYDUpEBYzJbV2xL18+GBRmmPjdXumlpcJqcclxjOQJSu/1WCGVfn/e/
64NQpAm7NSKLqeUl32g0/DvUpmYRfmf7ZjVUjePaJQU6sw5/N+3V9F1hYs8VSWz0
OMzYIfUcvixw+VWx5bu0nWt98FirhsQPjCTThD+DHP6koXGrdXpeMOQE1YZmoV5T
vP0X+FECgYEA5dsKVDQFL67muqz3CNRVM0xDWACCoa8789hYoxvhd1iO3e4kwXBa
ABPcZckioq+HiQ4UIxC2AhQ1FuTeIUTq7LZ0HtAAdKFi48U4LzmPhNUpG1E/HbJ3
GQbi4u1cAzGYuhdywktgBhn9bJ4XB7+X3815Y9qKkuRcwtXgKGDy8HkCgYEAxyly
vc7NBkLfIAmkOsm6VXfvfBTEUBUGi6+k1rarTUxWFIgRuk4FHywwWUTdxWBKJz3n
HNNJb/g7CcufdhLTuWVHQtJDxYf2cJjoi+Kf7/i/Qs9Nyhokj5Mnh6KlZQOWXpZd
Gwn/O13NeDxt1TIVO2xp6zY4FhVEPvaHuxsMCtECgYA7/eR/P6iO3nZoCJbdXhXy
spftEw0FSCg8p53SzIcXUCzRrcM4HavP0181zb5VebzFP8Bvun/WoRGOLSPwyP0L
1T8Pf7huuGSIEERuxvY3dC8raxQvGxJMnOiA0/Ss/Lfg8hfIsEWashPb0pMuOYpZ
JlblgfejCSlQzOOZhlxB+QKBgQCKmizRLV9/0QAJAsy5YPR9UJdpCebJOKiyg806
5Ct5AvwRE9UKjAuCczU+mu+f0fApOSpi5CQCeYVUvtG90UJpjrM2LLCfgoyeNbv4
xgG6dqlcbHrdgK4bATUMbsOd9g4qy4gGLkHi5df9qkhhi5Y9Iajg2X3U2H4DN3yk
WSFbUQKBgQCLz333qWOuT3OBv+EYxHDQUS4YG+dReUos+v0iPJzu+spnfibBF5IC
RjHIhPsdN1byNB0naXOkkz4tUlLGXv6umFgDtQvy/2rxvxQmUGp/WY1VM2+164Xe
NEWdMEU6UckCoMO77kw8JosKhmXCYaSW5bWwnXuEpOj9WWpwjKtxlA==
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDQDCCAiigAwIBAgIUe90yOBN1KBxOEr2jro3epamZksIwDQYJKoZIhvcNAQEL
BQAwNDEyMDAGA1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5l
cmF0ZWQgQ0EwHhcNMjQwMTE2MDIzMjIyWhcNMjcwMTE1MDIzMjIyWjAPMQ0wCwYD
VQQDEwRlczAxMIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAxGEL71pV
j8qoUxEuL7qjRSeS1eHxeKhu2jqEZb7iA1o/7b/26QuYAkoYL+WuJNfYjg5F/O8W
VVuAYIlN6a/mC6wT2t3pX4YSrdp+i3gtAC/LX+8mAeqMQPD+4jitOwjOsYzbuFCb
nYl86dnFPl/+Pmj20mtZ+Wt7oIPD88j6+r5qgv59pHICxS7Cq304LDTRQbNoT8HO
4c9VGGGtWIdtrqiYrz1OVefkffMrvFt77v6dKHn8g5tSyfQUDCoEKtTOc3Pe5zCB
vIMs6HaapoSkl8XdpFHQ712PCZRebAMCrVcPYQ3r8e9GYmLY/NhxEn3dWTqRhHeg
UD13O8o1aBWonwIDAQABo28wbTAdBgNVHQ4EFgQUXvGJtSf2/mLOK17AzUridtCV
xWwwHwYDVR0jBBgwFoAUwMg5afDWXz3vDQhlZRyUlXJRjoUwIAYDVR0RBBkwF4IJ
bG9jYWxob3N0hwR/AAABggRlczAxMAkGA1UdEwQCMAAwDQYJKoZIhvcNAQELBQAD
ggEBACaNq3ZqrbsGvbEtrf6kJGIsTokTFHeVJUSYmt1ZZzDFLSepXAC/J8gphV45
B+YSlkDPNTwMYlf7TUYY872zkdqOXN9r0NUx8MzVAX0+rux0RJba5GGUvJGZDNMX
WM5z9ry1KjQSQ1bSoRQOD3QArmBmhvikHjLc97Vqt56N0wA/ztXWOpNZX/TXmast
aXlUbcfQE73Cdq9tW1ATXwbQ2Gf7vVAUT3zjZSZbNdgPuBicGJHf85Fhjm2ND4+R
sjLIOQ2YgVxNHYbueScc6lJM5RNK194K7WrEQnRyGHT3NaDUm0FFNl//aQeq1ZVw
6gaUYlkTFauXwEYMDK901cWFaBE=
-----END CERTIFICATE-----

View File

@ -0,0 +1,27 @@
-----BEGIN RSA PRIVATE KEY-----
MIIEpAIBAAKCAQEAxGEL71pVj8qoUxEuL7qjRSeS1eHxeKhu2jqEZb7iA1o/7b/2
6QuYAkoYL+WuJNfYjg5F/O8WVVuAYIlN6a/mC6wT2t3pX4YSrdp+i3gtAC/LX+8m
AeqMQPD+4jitOwjOsYzbuFCbnYl86dnFPl/+Pmj20mtZ+Wt7oIPD88j6+r5qgv59
pHICxS7Cq304LDTRQbNoT8HO4c9VGGGtWIdtrqiYrz1OVefkffMrvFt77v6dKHn8
g5tSyfQUDCoEKtTOc3Pe5zCBvIMs6HaapoSkl8XdpFHQ712PCZRebAMCrVcPYQ3r
8e9GYmLY/NhxEn3dWTqRhHegUD13O8o1aBWonwIDAQABAoIBADJ3A/Om4az5dcce
96EBU9q+IDBBh2Wr1wzSk9p3sqoM47fLqH5b4dzYwJ1yZw2FwFtFFLw6jqExyexE
7JY8gyAFwPZyJ3pKQHuX1gQuRlYxchB9quU8Kn230LA+w1mT2lXrLj2PzWWvAsAv
m837KiFMpP0O5EjB07u8kLsRr1mG6QQ24Kc8oxd7xLXIiPzSvsOpYwo9hmIWENd5
kyA7oSa9EmN3TRTkKOHI7cFQ3DqIGdO71waUofKOdx39DyHS2YKWxDE/LUjkS9zw
1AyZG09l4uowyLRqwYhivEq9Za6rdc64yheuHatAM9kC2AOcVcsCPZquIe90k4t1
L7e9CAECgYEA1W483xTW8ngzxv9MMuPiW+PwVGRpyQrbO6OZOxdWEYfhrZlk5wlW
XK2T85jqooJwMWPTk1F49vZ9WN2KuLkL65GlkEtkFbxmOiFJjXuWwycbFSk05hPs
4AESBYHieaSPcwYhvLeG6g4PFyeqmbAGnKsJaj2ylPwDBOc7LgVlqAECgYEA64wo
gZwaj5SlP8M/OqGH04UVYr1kP/Eq6eiDfMyV5exy+pyzofZyNKUfJfw6sGgyRRHx
OVxlnPMsZ8zbdOXsvUEIeavpwDfQcp5eAURL65I6GMLsx2QpfiN2mDe1MqQW0jct
UleFaURgS84KHLE0+tBBg906jOHGjsE7Q3lyUJ8CgYBYYPev4K9JZGD8bEcfY6Ie
Lvsb1yC+8VHrFkmjYHxxcfUPr89KpGEwq2fynUW72YufyBiajkgq69Ln84U4DNhU
ydDnOXDOV191fsc4YQ8C7LSYRKH1DBcwgwD1at1fRbdpCAb8YHrrfLre+bv5PBzg
zyps5fOHIfwWEbI90lpQAQKBgQDoMMqBMTtxi+r1lucOScrVtFuncOCQs5BE8cIj
1JxzAQk6iBv/LSvZP2gcDq5f1Oaw9YXfsHguJfwA+ozeiAQ9bw0Gu3N52sstIXWz
M/rO5d9FJ2k3CEJqqFSwqkGBAQXKBUA06jeF1DREpX+MVxbNo1rhvMOJusn7UPm1
gtMwKwKBgQCfRzFO10ITwrw8rcRZwO9Axgqf11V7xn6qpgRxj4h0HOErVTCN1H0b
vE3Pz7cxS/g9vFRP37TuqBLfGVzPt9LAEFwCWPeZJLROBLHyu8XrhTbQx+sI2/pe
SBEJAQAHtYasFTE0sBEKNEY2rIt1c29XZhyhhtNKD9gRN/gB355wLg==
-----END RSA PRIVATE KEY-----

View File

@ -0,0 +1,7 @@
instances:
- name: es01
dns:
- es01
- localhost
ip:
- 127.0.0.1

View File

@ -191,5 +191,11 @@
"listen": "0.0.0.0:636",
"upstream": "ldap:636",
"enabled": true
},
{
"name": "elasticsearch",
"listen": "0.0.0.0:9200",
"upstream": "elasticsearch:9200",
"enabled": true
}
]

View File

@ -1 +1,2 @@
toxiproxy
elasticsearch

View File

@ -59,7 +59,7 @@ fields(action_create) ->
action(create),
index(),
id(false),
doc(true),
doc(),
routing(),
require_alias(),
overwrite()
@ -72,7 +72,7 @@ fields(action_update) ->
action(update),
index(),
id(true),
doc(true),
doc(),
routing(),
require_alias()
| http_common_opts()
@ -153,12 +153,12 @@ id(Required) ->
}
)}.
doc(Required) ->
doc() ->
{doc,
?HOCON(
binary(),
#{
required => Required,
required => false,
example => <<"${payload.doc}">>,
desc => ?DESC("config_parameters_doc")
}

View File

@ -0,0 +1,372 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2024 EMQ Technologies Co., Ltd. All Rights Reserved.
%%
%% Licensed under the Apache License, Version 2.0 (the "License");
%% you may not use this file except in compliance with the License.
%% You may obtain a copy of the License at
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing, software
%% distributed under the License is distributed on an "AS IS" BASIS,
%% WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
%% See the License for the specific language governing permissions and
%% limitations under the License.
%%--------------------------------------------------------------------
-module(emqx_bridge_es_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
-include_lib("emqx_resource/include/emqx_resource.hrl").
-import(emqx_common_test_helpers, [on_exit/1]).
-define(TYPE, elasticsearch).
-define(CA, "es.crt").
%%------------------------------------------------------------------------------
%% CT boilerplate
%%------------------------------------------------------------------------------
all() ->
emqx_common_test_helpers:all(?MODULE).
init_per_suite(Config) ->
ProxyName = "elasticsearch",
ESHost = os:getenv("ELASTICSEARCH_HOST", "elasticsearch"),
ESPort = list_to_integer(os:getenv("ELASTICSEARCH_PORT", "9200")),
Apps = emqx_cth_suite:start(
[
emqx,
emqx_conf,
emqx_connector,
emqx_bridge_es,
emqx_bridge,
emqx_rule_engine,
emqx_management,
{emqx_dashboard, "dashboard.listeners.http { enable = true, bind = 18083 }"}
],
#{work_dir => emqx_cth_suite:work_dir(Config)}
),
{ok, _} = emqx_common_test_http:create_default_app(),
wait_until_elasticsearch_is_up(ESHost, ESPort),
[
{apps, Apps},
{proxy_name, ProxyName},
{es_host, ESHost},
{es_port, ESPort}
| Config
].
es_checks() ->
case os:getenv("IS_CI") of
"yes" -> 10;
_ -> 1
end.
wait_until_elasticsearch_is_up(Host, Port) ->
wait_until_elasticsearch_is_up(es_checks(), Host, Port).
wait_until_elasticsearch_is_up(0, Host, Port) ->
throw({{Host, Port}, not_available});
wait_until_elasticsearch_is_up(Count, Host, Port) ->
timer:sleep(1000),
case emqx_common_test_helpers:is_all_tcp_servers_available([{Host, Port}]) of
true -> ok;
false -> wait_until_elasticsearch_is_up(Count - 1, Host, Port)
end.
end_per_suite(Config) ->
Apps = ?config(apps, Config),
%ProxyHost = ?config(proxy_host, Config),
%ProxyPort = ?config(proxy_port, Config),
%emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
emqx_cth_suite:stop(Apps),
ok.
init_per_testcase(_TestCase, Config) ->
Config.
end_per_testcase(_TestCase, _Config) ->
%ProxyHost = ?config(proxy_host, Config),
%ProxyPort = ?config(proxy_port, Config),
%emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(),
emqx_common_test_helpers:call_janitor(60_000),
ok.
%%-------------------------------------------------------------------------------------
%% Helper fns
%%-------------------------------------------------------------------------------------
check_send_message_with_action(ActionName, ConnectorName) ->
#{payload := _Payload} = send_message(ActionName),
%% ######################################
%% Check if message is sent to es
%% ######################################
check_action_metrics(ActionName, ConnectorName).
send_message(ActionName) ->
%% ######################################
%% Create message
%% ######################################
Time = erlang:unique_integer(),
BinTime = integer_to_binary(Time),
Payload = #{<<"name">> => <<"emqx">>, <<"release_time">> => BinTime},
Index = <<"emqx-test-index">>,
Msg = #{
clientid => BinTime,
payload => Payload,
timestamp => Time,
index => Index
},
%% ######################################
%% Send message
%% ######################################
emqx_bridge_v2:send_message(?TYPE, ActionName, Msg, #{}),
#{payload => Payload}.
check_action_metrics(ActionName, ConnectorName) ->
ActionId = emqx_bridge_v2:id(?TYPE, ActionName, ConnectorName),
Metrics =
#{
match => emqx_resource_metrics:matched_get(ActionId),
failed => emqx_resource_metrics:failed_get(ActionId),
queuing => emqx_resource_metrics:queuing_get(ActionId),
dropped => emqx_resource_metrics:dropped_get(ActionId)
},
?assertEqual(
#{match => 1, dropped => 0, failed => 0, queuing => 0},
Metrics
).
action_config(ConnectorName) ->
action_config(ConnectorName, _Overrides = #{}).
action_config(ConnectorName, Overrides) ->
Cfg0 = action(ConnectorName),
emqx_utils_maps:deep_merge(Cfg0, Overrides).
action(ConnectorName) ->
#{
<<"description">> => <<"My elasticsearch test action">>,
<<"enable">> => true,
<<"parameters">> => #{
<<"index">> => <<"${payload.index}">>,
<<"action">> => <<"create">>,
<<"doc">> => <<"${payload.doc}">>,
<<"overwrite">> => true
},
<<"connector">> => ConnectorName,
<<"resource_opts">> => #{
<<"health_check_interval">> => <<"30s">>,
<<"query_mode">> => <<"async">>
}
}.
base_url(Config) ->
Host = ?config(es_host, Config),
Port = ?config(es_port, Config),
iolist_to_binary([
"https://",
Host,
":",
integer_to_binary(Port)
]).
connector_config(Config) ->
connector_config(_Overrides = #{}, Config).
connector_config(Overrides, Config) ->
Defaults =
#{
<<"base_url">> => base_url(Config),
<<"enable">> => true,
<<"authentication">> => #{
<<"password">> => <<"emqx123">>,
<<"username">> => <<"elastic">>
},
<<"description">> => <<"My elasticsearch test connector">>,
<<"connect_timeout">> => <<"15s">>,
<<"pool_size">> => 2,
<<"pool_type">> => <<"random">>,
<<"enable_pipelining">> => 100,
<<"ssl">> => #{
<<"enable">> => true,
<<"hibernate_after">> => <<"5s">>,
<<"cacertfile">> => filename:join(?config(data_dir, Config), ?CA)
}
},
emqx_utils_maps:deep_merge(Defaults, Overrides).
create_connector(Name, Config) ->
Res = emqx_connector:create(?TYPE, Name, Config),
on_exit(fun() -> emqx_connector:remove(?TYPE, Name) end),
Res.
create_action(Name, Config) ->
Res = emqx_bridge_v2:create(?TYPE, Name, Config),
on_exit(fun() -> emqx_bridge_v2:remove(?TYPE, Name) end),
Res.
action_api_spec_props_for_get() ->
#{
<<"bridge_elasticsearch.get_bridge_v2">> :=
#{<<"properties">> := Props}
} =
emqx_bridge_v2_testlib:actions_api_spec_schemas(),
Props.
%%------------------------------------------------------------------------------
%% Testcases
%%------------------------------------------------------------------------------
t_create_remove_list(Config) ->
[] = emqx_bridge_v2:list(),
ConnectorConfig = connector_config(Config),
{ok, _} = emqx_connector:create(?TYPE, test_connector, ConnectorConfig),
ActionConfig = action(<<"test_connector">>),
{ok, _} = emqx_bridge_v2:create(?TYPE, test_action_1, ActionConfig),
[ActionInfo] = emqx_bridge_v2:list(),
#{
name := <<"test_action_1">>,
type := <<"elasticsearch">>,
raw_config := _RawConfig
} = ActionInfo,
{ok, _} = emqx_bridge_v2:create(?TYPE, test_action_2, ActionConfig),
2 = length(emqx_bridge_v2:list()),
ok = emqx_bridge_v2:remove(?TYPE, test_action_1),
1 = length(emqx_bridge_v2:list()),
ok = emqx_bridge_v2:remove(?TYPE, test_action_2),
[] = emqx_bridge_v2:list(),
emqx_connector:remove(?TYPE, test_connector),
ok.
%% Test sending a message to a bridge V2
t_send_message(Config) ->
ConnectorConfig = connector_config(Config),
{ok, _} = emqx_connector:create(?TYPE, test_connector2, ConnectorConfig),
ActionConfig = action(<<"test_connector2">>),
{ok, _} = emqx_bridge_v2:create(?TYPE, test_action_1, ActionConfig),
%% Use the action to send a message
check_send_message_with_action(test_action_1, test_connector2),
%% Create a few more bridges with the same connector and test them
BridgeNames1 = [
list_to_atom("test_bridge_v2_" ++ integer_to_list(I))
|| I <- lists:seq(2, 10)
],
lists:foreach(
fun(BridgeName) ->
{ok, _} = emqx_bridge_v2:create(?TYPE, BridgeName, ActionConfig),
check_send_message_with_action(BridgeName, test_connector2)
end,
BridgeNames1
),
BridgeNames = [test_bridge_v2_1 | BridgeNames1],
%% Send more messages to the bridges
lists:foreach(
fun(BridgeName) ->
lists:foreach(
fun(_) ->
check_send_message_with_action(BridgeName, test_connector2)
end,
lists:seq(1, 10)
)
end,
BridgeNames
),
%% Remove all the bridges
lists:foreach(
fun(BridgeName) ->
ok = emqx_bridge_v2:remove(?TYPE, BridgeName)
end,
BridgeNames
),
emqx_connector:remove(?TYPE, test_connector2),
ok.
%% Test that we can get the status of the bridge V2
t_health_check(Config) ->
BridgeV2Config = action(<<"test_connector3">>),
ConnectorConfig = connector_config(Config),
{ok, _} = emqx_connector:create(?TYPE, test_connector3, ConnectorConfig),
{ok, _} = emqx_bridge_v2:create(?TYPE, test_bridge_v2, BridgeV2Config),
#{status := connected} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2),
ok = emqx_bridge_v2:remove(?TYPE, test_bridge_v2),
%% Check behaviour when bridge does not exist
{error, bridge_not_found} = emqx_bridge_v2:health_check(?TYPE, test_bridge_v2),
ok = emqx_connector:remove(?TYPE, test_connector3),
ok.
t_bad_url(Config) ->
ConnectorName = <<"test_connector">>,
ActionName = <<"test_action">>,
ActionConfig = action(<<"test_connector">>),
ConnectorConfig0 = connector_config(Config),
ConnectorConfig = ConnectorConfig0#{<<"base_url">> := <<"bad_host:9092">>},
?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)),
?assertMatch({ok, _}, create_action(ActionName, ActionConfig)),
?assertMatch(
{ok, #{
resource_data :=
#{
status := ?status_disconnected,
error := failed_to_start_elasticsearch_bridge
}
}},
emqx_connector:lookup(?TYPE, ConnectorName)
),
?assertMatch({ok, #{status := ?status_disconnected}}, emqx_bridge_v2:lookup(?TYPE, ActionName)),
ok.
t_parameters_key_api_spec(_Config) ->
ActionProps = action_api_spec_props_for_get(),
?assertNot(is_map_key(<<"elasticsearch">>, ActionProps), #{action_props => ActionProps}),
?assert(is_map_key(<<"parameters">>, ActionProps), #{action_props => ActionProps}),
ok.
t_http_api_get(Config) ->
ConnectorName = <<"test_connector">>,
ActionName = <<"test_action">>,
ActionConfig = action(ConnectorName),
ConnectorConfig = connector_config(Config),
?assertMatch({ok, _}, create_connector(ConnectorName, ConnectorConfig)),
?assertMatch({ok, _}, create_action(ActionName, ActionConfig)),
?assertMatch(
{ok,
{{_, 200, _}, _, [
#{
<<"connector">> := ConnectorName,
<<"description">> := <<"My elasticsearch test action">>,
<<"enable">> := true,
<<"error">> := <<>>,
<<"name">> := ActionName,
<<"node_status">> :=
[
#{
<<"node">> := _,
<<"status">> := <<"connected">>,
<<"status_reason">> := <<>>
}
],
<<"parameters">> :=
#{
<<"action">> := <<"create">>,
<<"doc">> := <<"${payload.doc}">>,
<<"index">> := <<"${payload.index}">>,
<<"max_retries">> := 2,
<<"overwrite">> := true
},
<<"resource_opts">> := #{<<"query_mode">> := <<"async">>},
<<"status">> := <<"connected">>,
<<"status_reason">> := <<>>,
<<"type">> := <<"elasticsearch">>
}
]}},
emqx_bridge_v2_testlib:list_bridges_api()
),
ok.

View File

@ -0,0 +1,20 @@
-----BEGIN CERTIFICATE-----
MIIDSjCCAjKgAwIBAgIVAIrN275DCtGnotTPpxwvQ5751N4OMA0GCSqGSIb3DQEB
CwUAMDQxMjAwBgNVBAMTKUVsYXN0aWMgQ2VydGlmaWNhdGUgVG9vbCBBdXRvZ2Vu
ZXJhdGVkIENBMB4XDTI0MDExNjAyMzIyMFoXDTI3MDExNTAyMzIyMFowNDEyMDAG
A1UEAxMpRWxhc3RpYyBDZXJ0aWZpY2F0ZSBUb29sIEF1dG9nZW5lcmF0ZWQgQ0Ew
ggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQCy0nwiEurUkIPFMLV1weVM
pPk/AlwZUzqjkeL44gsY53XI9Q05w/sL9u6PzwrXgTCFWNXzI9+MoAtp8phPkn14
cmg5/3sLe9YcFVFjYK/MoljlUbPDj+4dgk8l+w5FRSi0+JN5krUm7rYk9lojAkeS
fX8RU7ekKGbjBXIFtPxX5GNadu9RidR5GkHM3XroAIoris8bFOzMgFn9iybYnkhq
0S+Hpv0A8FVxzle0KNbPpsIkxXH2DnP2iPTDym9xJNl9Iv9MPtj9XaamH7TmXcSt
MbjkAudKsCw4bRuhHonM16DIUr8sX5UcRcAWyJ1x1qpZaOzMdh2VdYAHNuOsZwzJ
AgMBAAGjUzBRMB0GA1UdDgQWBBTAyDlp8NZfPe8NCGVlHJSVclGOhTAfBgNVHSME
GDAWgBTAyDlp8NZfPe8NCGVlHJSVclGOhTAPBgNVHRMBAf8EBTADAQH/MA0GCSqG
SIb3DQEBCwUAA4IBAQAeIUXRKmC53iirY4P49YspLafspAMf4ndMFQAp+Oc223Vs
hQC4axNoYnUdzWDH6LioAN7P826xNPqtXvTZF9fmeX7K8Nm9Kdj+for+QQI3j6+X
zq98VVkACb8b/Mc9Nac/WBbv/1IKyKgNNta7//WNPgAFolOfti/C0NLsPcKhrM9L
mGbvRX8ZjH8pVJ0YTy4/xfDcF7G/Lxl4Yvb0ZXpuQbvE1+Y0h5aoTNshT/skJxC4
iyVseYr21s3pptKcr6H9KZuSdZe5pbEo+81nT15w+50aswFLk9GCYh5UsQ+1jkRK
cKgxP93i6x8BVbQJGKi1A1jhauSKX2IpWZQsHy4p
-----END CERTIFICATE-----

View File

@ -150,6 +150,9 @@ post_config_update([?ROOT_KEY, Type, Name], '$remove', _, _OldConf, _AppEnvs) ->
ok = emqx_connector_resource:remove(Type, Name),
?tp(connector_post_config_update_done, #{}),
ok;
{error, not_found} ->
?tp(connector_post_config_update_done, #{}),
ok;
{ok, Channels} ->
{error, {active_channels, Channels}}
end;

View File

@ -97,7 +97,7 @@ config_parameters_require_alias.label:
"""_require_alias"""
config_parameters_doc.desc:
"""JSON document"""
"""JSON document. If undefined, rule engine will use JSON format to serialize all visible inputs, such as clientid, topic, payload etc."""
config_parameters_doc.label:
"""doc"""

View File

@ -246,6 +246,9 @@ for dep in ${CT_DEPS}; do
otel)
FILES+=( '.ci/docker-compose-file/docker-compose-otel.yaml' )
;;
elasticsearch)
FILES+=( '.ci/docker-compose-file/docker-compose-elastic-search-tls.yaml' )
;;
*)
echo "unknown_ct_dependency $dep"
exit 1