test(influxdb): increase influxdb bridge/connector coverage (ee5.0)
This commit is contained in:
parent
b6445cdc14
commit
04588148b7
|
@ -0,0 +1,36 @@
|
||||||
|
version: '3.9'
|
||||||
|
|
||||||
|
services:
|
||||||
|
influxdb_server_tcp:
|
||||||
|
container_name: influxdb_tcp
|
||||||
|
image: influxdb:${INFLUXDB_TAG}
|
||||||
|
expose:
|
||||||
|
- "8086"
|
||||||
|
- "8089/udp"
|
||||||
|
- "8083"
|
||||||
|
# ports:
|
||||||
|
# - "8086:8086"
|
||||||
|
environment:
|
||||||
|
DOCKER_INFLUXDB_INIT_MODE: setup
|
||||||
|
DOCKER_INFLUXDB_INIT_USERNAME: root
|
||||||
|
DOCKER_INFLUXDB_INIT_PASSWORD: emqx@123
|
||||||
|
DOCKER_INFLUXDB_INIT_ORG: emqx
|
||||||
|
DOCKER_INFLUXDB_INIT_BUCKET: mqtt
|
||||||
|
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: abcdefg
|
||||||
|
volumes:
|
||||||
|
- "./influxdb/setup-v1.sh:/docker-entrypoint-initdb.d/setup-v1.sh"
|
||||||
|
restart: always
|
||||||
|
networks:
|
||||||
|
- emqx_bridge
|
||||||
|
|
||||||
|
# networks:
|
||||||
|
# emqx_bridge:
|
||||||
|
# driver: bridge
|
||||||
|
# name: emqx_bridge
|
||||||
|
# ipam:
|
||||||
|
# driver: default
|
||||||
|
# config:
|
||||||
|
# - subnet: 172.100.239.0/24
|
||||||
|
# gateway: 172.100.239.1
|
||||||
|
# - subnet: 2001:3200:3200::/64
|
||||||
|
# gateway: 2001:3200:3200::1
|
|
@ -0,0 +1,42 @@
|
||||||
|
version: '3.9'
|
||||||
|
|
||||||
|
services:
|
||||||
|
influxdb_server_tls:
|
||||||
|
container_name: influxdb_tls
|
||||||
|
image: influxdb:${INFLUXDB_TAG}
|
||||||
|
expose:
|
||||||
|
- "8086"
|
||||||
|
- "8089/udp"
|
||||||
|
- "8083"
|
||||||
|
# ports:
|
||||||
|
# - "8087:8086"
|
||||||
|
environment:
|
||||||
|
DOCKER_INFLUXDB_INIT_MODE: setup
|
||||||
|
DOCKER_INFLUXDB_INIT_USERNAME: root
|
||||||
|
DOCKER_INFLUXDB_INIT_PASSWORD: emqx@123
|
||||||
|
DOCKER_INFLUXDB_INIT_ORG: emqx
|
||||||
|
DOCKER_INFLUXDB_INIT_BUCKET: mqtt
|
||||||
|
DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: abcdefg
|
||||||
|
volumes:
|
||||||
|
- ./certs/server.crt:/etc/influxdb/cert.pem
|
||||||
|
- ./certs/server.key:/etc/influxdb/key.pem
|
||||||
|
- "./influxdb/setup-v1.sh:/docker-entrypoint-initdb.d/setup-v1.sh"
|
||||||
|
command:
|
||||||
|
- influxd
|
||||||
|
- --tls-cert=/etc/influxdb/cert.pem
|
||||||
|
- --tls-key=/etc/influxdb/key.pem
|
||||||
|
restart: always
|
||||||
|
networks:
|
||||||
|
- emqx_bridge
|
||||||
|
|
||||||
|
# networks:
|
||||||
|
# emqx_bridge:
|
||||||
|
# driver: bridge
|
||||||
|
# name: emqx_bridge
|
||||||
|
# ipam:
|
||||||
|
# driver: default
|
||||||
|
# config:
|
||||||
|
# - subnet: 172.100.239.0/24
|
||||||
|
# gateway: 172.100.239.1
|
||||||
|
# - subnet: 2001:3200:3200::/64
|
||||||
|
# gateway: 2001:3200:3200::1
|
|
@ -0,0 +1,18 @@
|
||||||
|
version: '3.9'
|
||||||
|
|
||||||
|
services:
|
||||||
|
toxiproxy:
|
||||||
|
container_name: toxiproxy
|
||||||
|
image: ghcr.io/shopify/toxiproxy:2.5.0
|
||||||
|
restart: always
|
||||||
|
networks:
|
||||||
|
- emqx_bridge
|
||||||
|
volumes:
|
||||||
|
- "./toxiproxy.json:/config/toxiproxy.json"
|
||||||
|
ports:
|
||||||
|
- 8474:8474
|
||||||
|
- 8086:8086
|
||||||
|
- 8087:8087
|
||||||
|
command:
|
||||||
|
- "-host=0.0.0.0"
|
||||||
|
- "-config=/config/toxiproxy.json"
|
|
@ -0,0 +1,16 @@
|
||||||
|
#!/usr/bin/env bash
|
||||||
|
|
||||||
|
set -e
|
||||||
|
|
||||||
|
# influx v1 dbrp create \
|
||||||
|
# --bucket-id ${DOCKER_INFLUXDB_INIT_BUCKET_ID} \
|
||||||
|
# --db ${V1_DB_NAME} \
|
||||||
|
# --rp ${V1_RP_NAME} \
|
||||||
|
# --default \
|
||||||
|
# --org ${DOCKER_INFLUXDB_INIT_ORG}
|
||||||
|
|
||||||
|
influx v1 auth create \
|
||||||
|
--username "${DOCKER_INFLUXDB_INIT_USERNAME}" \
|
||||||
|
--password "${DOCKER_INFLUXDB_INIT_PASSWORD}" \
|
||||||
|
--write-bucket "${DOCKER_INFLUXDB_INIT_BUCKET_ID}" \
|
||||||
|
--org "${DOCKER_INFLUXDB_INIT_ORG}"
|
|
@ -0,0 +1,14 @@
|
||||||
|
[
|
||||||
|
{
|
||||||
|
"name": "influxdb_tcp",
|
||||||
|
"listen": "0.0.0.0:8086",
|
||||||
|
"upstream": "influxdb_tcp:8086",
|
||||||
|
"enabled": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "influxdb_tls",
|
||||||
|
"listen": "0.0.0.0:8087",
|
||||||
|
"upstream": "influxdb_tls:8086",
|
||||||
|
"enabled": true
|
||||||
|
}
|
||||||
|
]
|
|
@ -120,6 +120,7 @@ jobs:
|
||||||
MYSQL_TAG: 8
|
MYSQL_TAG: 8
|
||||||
PGSQL_TAG: 13
|
PGSQL_TAG: 13
|
||||||
REDIS_TAG: 6
|
REDIS_TAG: 6
|
||||||
|
INFLUXDB_TAG: 2.5.0
|
||||||
run: |
|
run: |
|
||||||
./scripts/ct/run.sh --app ${{ matrix.app_name }}
|
./scripts/ct/run.sh --app ${{ matrix.app_name }}
|
||||||
- uses: actions/upload-artifact@v1
|
- uses: actions/upload-artifact@v1
|
||||||
|
|
4
Makefile
4
Makefile
|
@ -83,7 +83,11 @@ $(foreach app,$(APPS),$(eval $(call gen-app-prop-target,$(app))))
|
||||||
.PHONY: ct-suite
|
.PHONY: ct-suite
|
||||||
ct-suite: $(REBAR)
|
ct-suite: $(REBAR)
|
||||||
ifneq ($(TESTCASE),)
|
ifneq ($(TESTCASE),)
|
||||||
|
ifneq ($(GROUP),)
|
||||||
|
$(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE) --case $(TESTCASE) --group $(GROUP)
|
||||||
|
else
|
||||||
$(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE) --case $(TESTCASE)
|
$(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE) --case $(TESTCASE)
|
||||||
|
endif
|
||||||
else ifneq ($(GROUP),)
|
else ifneq ($(GROUP),)
|
||||||
$(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE) --group $(GROUP)
|
$(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE) --group $(GROUP)
|
||||||
else
|
else
|
||||||
|
|
|
@ -65,6 +65,15 @@
|
||||||
stop_slave/1
|
stop_slave/1
|
||||||
]).
|
]).
|
||||||
|
|
||||||
|
-export([clear_screen/0]).
|
||||||
|
-export([with_mock/4]).
|
||||||
|
|
||||||
|
%% Toxiproxy API
|
||||||
|
-export([
|
||||||
|
with_failure/5,
|
||||||
|
reset_proxy/2
|
||||||
|
]).
|
||||||
|
|
||||||
-define(CERTS_PATH(CertName), filename:join(["etc", "certs", CertName])).
|
-define(CERTS_PATH(CertName), filename:join(["etc", "certs", CertName])).
|
||||||
|
|
||||||
-define(MQTT_SSL_TWOWAY, [
|
-define(MQTT_SSL_TWOWAY, [
|
||||||
|
@ -769,3 +778,129 @@ expand_node_specs(Specs, CommonOpts) ->
|
||||||
end,
|
end,
|
||||||
Specs
|
Specs
|
||||||
).
|
).
|
||||||
|
|
||||||
|
%% is useful when iterating on the tests in a loop, to get rid of all
|
||||||
|
%% the garbaged printed before the test itself beings.
|
||||||
|
clear_screen() ->
|
||||||
|
io:format(standard_io, "\033[H\033[2J", []),
|
||||||
|
io:format(standard_error, "\033[H\033[2J", []),
|
||||||
|
io:format(standard_io, "\033[H\033[3J", []),
|
||||||
|
io:format(standard_error, "\033[H\033[3J", []),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
with_mock(Mod, FnName, MockedFn, Fun) ->
|
||||||
|
ok = meck:new(Mod, [non_strict, no_link, no_history, passthrough]),
|
||||||
|
ok = meck:expect(Mod, FnName, MockedFn),
|
||||||
|
try
|
||||||
|
Fun()
|
||||||
|
after
|
||||||
|
ok = meck:unload(Mod)
|
||||||
|
end.
|
||||||
|
|
||||||
|
%%-------------------------------------------------------------------------------
|
||||||
|
%% Toxiproxy utils
|
||||||
|
%%-------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
reset_proxy(ProxyHost, ProxyPort) ->
|
||||||
|
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/reset",
|
||||||
|
Body = <<>>,
|
||||||
|
{ok, {{_, 204, _}, _, _}} = httpc:request(
|
||||||
|
post,
|
||||||
|
{Url, [], "application/json", Body},
|
||||||
|
[],
|
||||||
|
[{body_format, binary}]
|
||||||
|
).
|
||||||
|
|
||||||
|
with_failure(FailureType, Name, ProxyHost, ProxyPort, Fun) ->
|
||||||
|
enable_failure(FailureType, Name, ProxyHost, ProxyPort),
|
||||||
|
try
|
||||||
|
Fun()
|
||||||
|
after
|
||||||
|
heal_failure(FailureType, Name, ProxyHost, ProxyPort)
|
||||||
|
end.
|
||||||
|
|
||||||
|
enable_failure(FailureType, Name, ProxyHost, ProxyPort) ->
|
||||||
|
case FailureType of
|
||||||
|
down -> switch_proxy(off, Name, ProxyHost, ProxyPort);
|
||||||
|
timeout -> timeout_proxy(on, Name, ProxyHost, ProxyPort);
|
||||||
|
latency_up -> latency_up_proxy(on, Name, ProxyHost, ProxyPort)
|
||||||
|
end.
|
||||||
|
|
||||||
|
heal_failure(FailureType, Name, ProxyHost, ProxyPort) ->
|
||||||
|
case FailureType of
|
||||||
|
down -> switch_proxy(on, Name, ProxyHost, ProxyPort);
|
||||||
|
timeout -> timeout_proxy(off, Name, ProxyHost, ProxyPort);
|
||||||
|
latency_up -> latency_up_proxy(off, Name, ProxyHost, ProxyPort)
|
||||||
|
end.
|
||||||
|
|
||||||
|
switch_proxy(Switch, Name, ProxyHost, ProxyPort) ->
|
||||||
|
Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name,
|
||||||
|
Body =
|
||||||
|
case Switch of
|
||||||
|
off -> <<"{\"enabled\":false}">>;
|
||||||
|
on -> <<"{\"enabled\":true}">>
|
||||||
|
end,
|
||||||
|
{ok, {{_, 200, _}, _, _}} = httpc:request(
|
||||||
|
post,
|
||||||
|
{Url, [], "application/json", Body},
|
||||||
|
[],
|
||||||
|
[{body_format, binary}]
|
||||||
|
).
|
||||||
|
|
||||||
|
timeout_proxy(on, Name, ProxyHost, ProxyPort) ->
|
||||||
|
Url =
|
||||||
|
"http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
|
||||||
|
"/toxics",
|
||||||
|
NameBin = list_to_binary(Name),
|
||||||
|
Body =
|
||||||
|
<<"{\"name\":\"", NameBin/binary,
|
||||||
|
"_timeout\",\"type\":\"timeout\","
|
||||||
|
"\"stream\":\"upstream\",\"toxicity\":1.0,"
|
||||||
|
"\"attributes\":{\"timeout\":0}}">>,
|
||||||
|
{ok, {{_, 200, _}, _, _}} = httpc:request(
|
||||||
|
post,
|
||||||
|
{Url, [], "application/json", Body},
|
||||||
|
[],
|
||||||
|
[{body_format, binary}]
|
||||||
|
);
|
||||||
|
timeout_proxy(off, Name, ProxyHost, ProxyPort) ->
|
||||||
|
ToxicName = Name ++ "_timeout",
|
||||||
|
Url =
|
||||||
|
"http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
|
||||||
|
"/toxics/" ++ ToxicName,
|
||||||
|
Body = <<>>,
|
||||||
|
{ok, {{_, 204, _}, _, _}} = httpc:request(
|
||||||
|
delete,
|
||||||
|
{Url, [], "application/json", Body},
|
||||||
|
[],
|
||||||
|
[{body_format, binary}]
|
||||||
|
).
|
||||||
|
|
||||||
|
latency_up_proxy(on, Name, ProxyHost, ProxyPort) ->
|
||||||
|
Url =
|
||||||
|
"http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
|
||||||
|
"/toxics",
|
||||||
|
NameBin = list_to_binary(Name),
|
||||||
|
Body =
|
||||||
|
<<"{\"name\":\"", NameBin/binary,
|
||||||
|
"_latency_up\",\"type\":\"latency\","
|
||||||
|
"\"stream\":\"upstream\",\"toxicity\":1.0,"
|
||||||
|
"\"attributes\":{\"latency\":20000,\"jitter\":3000}}">>,
|
||||||
|
{ok, {{_, 200, _}, _, _}} = httpc:request(
|
||||||
|
post,
|
||||||
|
{Url, [], "application/json", Body},
|
||||||
|
[],
|
||||||
|
[{body_format, binary}]
|
||||||
|
);
|
||||||
|
latency_up_proxy(off, Name, ProxyHost, ProxyPort) ->
|
||||||
|
ToxicName = Name ++ "_latency_up",
|
||||||
|
Url =
|
||||||
|
"http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++
|
||||||
|
"/toxics/" ++ ToxicName,
|
||||||
|
Body = <<>>,
|
||||||
|
{ok, {{_, 204, _}, _, _}} = httpc:request(
|
||||||
|
delete,
|
||||||
|
{Url, [], "application/json", Body},
|
||||||
|
[],
|
||||||
|
[{body_format, binary}]
|
||||||
|
).
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
influxdb
|
||||||
|
kafka
|
||||||
mongo
|
mongo
|
||||||
mongo_rs_sharded
|
mongo_rs_sharded
|
||||||
kafka
|
|
||||||
|
|
|
@ -0,0 +1,863 @@
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved.
|
||||||
|
%%--------------------------------------------------------------------
|
||||||
|
-module(emqx_ee_bridge_influxdb_SUITE).
|
||||||
|
|
||||||
|
-compile(nowarn_export_all).
|
||||||
|
-compile(export_all).
|
||||||
|
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
-include_lib("common_test/include/ct.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% CT boilerplate
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
all() ->
|
||||||
|
[
|
||||||
|
{group, with_batch},
|
||||||
|
{group, without_batch}
|
||||||
|
].
|
||||||
|
|
||||||
|
groups() ->
|
||||||
|
TCs = emqx_common_test_helpers:all(?MODULE),
|
||||||
|
[
|
||||||
|
{with_batch, [
|
||||||
|
{group, sync_query},
|
||||||
|
{group, async_query}
|
||||||
|
]},
|
||||||
|
{without_batch, [
|
||||||
|
{group, sync_query},
|
||||||
|
{group, async_query}
|
||||||
|
]},
|
||||||
|
{sync_query, [
|
||||||
|
{group, apiv1_tcp},
|
||||||
|
{group, apiv1_tls},
|
||||||
|
{group, apiv2_tcp},
|
||||||
|
{group, apiv2_tls}
|
||||||
|
]},
|
||||||
|
{async_query, [
|
||||||
|
{group, apiv1_tcp},
|
||||||
|
{group, apiv1_tls},
|
||||||
|
{group, apiv2_tcp},
|
||||||
|
{group, apiv2_tls}
|
||||||
|
]},
|
||||||
|
{apiv1_tcp, TCs},
|
||||||
|
{apiv1_tls, TCs},
|
||||||
|
{apiv2_tcp, TCs},
|
||||||
|
{apiv2_tls, TCs}
|
||||||
|
].
|
||||||
|
|
||||||
|
init_per_suite(Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_suite(_Config) ->
|
||||||
|
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
|
||||||
|
ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource]),
|
||||||
|
_ = application:stop(emqx_connector),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
init_per_group(InfluxDBType, Config0) when
|
||||||
|
InfluxDBType =:= apiv1_tcp;
|
||||||
|
InfluxDBType =:= apiv1_tls
|
||||||
|
->
|
||||||
|
#{
|
||||||
|
host := InfluxDBHost,
|
||||||
|
port := InfluxDBPort,
|
||||||
|
use_tls := UseTLS,
|
||||||
|
proxy_name := ProxyName
|
||||||
|
} =
|
||||||
|
case InfluxDBType of
|
||||||
|
apiv1_tcp ->
|
||||||
|
#{
|
||||||
|
host => os:getenv("INFLUXDB_APIV1_TCP_HOST", "toxiproxy"),
|
||||||
|
port => list_to_integer(os:getenv("INFLUXDB_APIV1_TCP_PORT", "8086")),
|
||||||
|
use_tls => false,
|
||||||
|
proxy_name => "influxdb_tcp"
|
||||||
|
};
|
||||||
|
apiv1_tls ->
|
||||||
|
#{
|
||||||
|
host => os:getenv("INFLUXDB_APIV1_TLS_HOST", "toxiproxy"),
|
||||||
|
port => list_to_integer(os:getenv("INFLUXDB_APIV1_TLS_PORT", "8087")),
|
||||||
|
use_tls => true,
|
||||||
|
proxy_name => "influxdb_tls"
|
||||||
|
}
|
||||||
|
end,
|
||||||
|
case emqx_common_test_helpers:is_tcp_server_available(InfluxDBHost, InfluxDBPort) of
|
||||||
|
true ->
|
||||||
|
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||||
|
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||||
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||||
|
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||||
|
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge]),
|
||||||
|
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||||
|
Config = [{use_tls, UseTLS} | Config0],
|
||||||
|
{Name, ConfigString, InfluxDBConfig} = influxdb_config(
|
||||||
|
apiv1, InfluxDBHost, InfluxDBPort, Config
|
||||||
|
),
|
||||||
|
EHttpcPoolNameBin = <<(atom_to_binary(?MODULE))/binary, "_apiv1">>,
|
||||||
|
EHttpcPoolName = binary_to_atom(EHttpcPoolNameBin),
|
||||||
|
{EHttpcTransport, EHttpcTransportOpts} =
|
||||||
|
case UseTLS of
|
||||||
|
true -> {tls, [{verify, verify_none}]};
|
||||||
|
false -> {tcp, []}
|
||||||
|
end,
|
||||||
|
EHttpcPoolOpts = [
|
||||||
|
{host, InfluxDBHost},
|
||||||
|
{port, InfluxDBPort},
|
||||||
|
{pool_size, 1},
|
||||||
|
{transport, EHttpcTransport},
|
||||||
|
{transport_opts, EHttpcTransportOpts}
|
||||||
|
],
|
||||||
|
{ok, _} = ehttpc_sup:start_pool(EHttpcPoolName, EHttpcPoolOpts),
|
||||||
|
[
|
||||||
|
{proxy_host, ProxyHost},
|
||||||
|
{proxy_port, ProxyPort},
|
||||||
|
{proxy_name, ProxyName},
|
||||||
|
{influxdb_host, InfluxDBHost},
|
||||||
|
{influxdb_port, InfluxDBPort},
|
||||||
|
{influxdb_type, apiv1},
|
||||||
|
{influxdb_config, InfluxDBConfig},
|
||||||
|
{influxdb_config_string, ConfigString},
|
||||||
|
{ehttpc_pool_name, EHttpcPoolName},
|
||||||
|
{influxdb_name, Name}
|
||||||
|
| Config
|
||||||
|
];
|
||||||
|
false ->
|
||||||
|
{skip, no_influxdb}
|
||||||
|
end;
|
||||||
|
init_per_group(InfluxDBType, Config0) when
|
||||||
|
InfluxDBType =:= apiv2_tcp;
|
||||||
|
InfluxDBType =:= apiv2_tls
|
||||||
|
->
|
||||||
|
#{
|
||||||
|
host := InfluxDBHost,
|
||||||
|
port := InfluxDBPort,
|
||||||
|
use_tls := UseTLS,
|
||||||
|
proxy_name := ProxyName
|
||||||
|
} =
|
||||||
|
case InfluxDBType of
|
||||||
|
apiv2_tcp ->
|
||||||
|
#{
|
||||||
|
host => os:getenv("INFLUXDB_APIV2_TCP_HOST", "toxiproxy"),
|
||||||
|
port => list_to_integer(os:getenv("INFLUXDB_APIV2_TCP_PORT", "8086")),
|
||||||
|
use_tls => false,
|
||||||
|
proxy_name => "influxdb_tcp"
|
||||||
|
};
|
||||||
|
apiv2_tls ->
|
||||||
|
#{
|
||||||
|
host => os:getenv("INFLUXDB_APIV2_TLS_HOST", "toxiproxy"),
|
||||||
|
port => list_to_integer(os:getenv("INFLUXDB_APIV2_TLS_PORT", "8087")),
|
||||||
|
use_tls => true,
|
||||||
|
proxy_name => "influxdb_tls"
|
||||||
|
}
|
||||||
|
end,
|
||||||
|
case emqx_common_test_helpers:is_tcp_server_available(InfluxDBHost, InfluxDBPort) of
|
||||||
|
true ->
|
||||||
|
ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"),
|
||||||
|
ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")),
|
||||||
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||||
|
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
|
||||||
|
ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge]),
|
||||||
|
{ok, _} = application:ensure_all_started(emqx_connector),
|
||||||
|
Config = [{use_tls, UseTLS} | Config0],
|
||||||
|
{Name, ConfigString, InfluxDBConfig} = influxdb_config(
|
||||||
|
apiv2, InfluxDBHost, InfluxDBPort, Config
|
||||||
|
),
|
||||||
|
EHttpcPoolNameBin = <<(atom_to_binary(?MODULE))/binary, "_apiv2">>,
|
||||||
|
EHttpcPoolName = binary_to_atom(EHttpcPoolNameBin),
|
||||||
|
{EHttpcTransport, EHttpcTransportOpts} =
|
||||||
|
case UseTLS of
|
||||||
|
true -> {tls, [{verify, verify_none}]};
|
||||||
|
false -> {tcp, []}
|
||||||
|
end,
|
||||||
|
EHttpcPoolOpts = [
|
||||||
|
{host, InfluxDBHost},
|
||||||
|
{port, InfluxDBPort},
|
||||||
|
{pool_size, 1},
|
||||||
|
{transport, EHttpcTransport},
|
||||||
|
{transport_opts, EHttpcTransportOpts}
|
||||||
|
],
|
||||||
|
{ok, _} = ehttpc_sup:start_pool(EHttpcPoolName, EHttpcPoolOpts),
|
||||||
|
[
|
||||||
|
{proxy_host, ProxyHost},
|
||||||
|
{proxy_port, ProxyPort},
|
||||||
|
{proxy_name, ProxyName},
|
||||||
|
{influxdb_host, InfluxDBHost},
|
||||||
|
{influxdb_port, InfluxDBPort},
|
||||||
|
{influxdb_type, apiv2},
|
||||||
|
{influxdb_config, InfluxDBConfig},
|
||||||
|
{influxdb_config_string, ConfigString},
|
||||||
|
{ehttpc_pool_name, EHttpcPoolName},
|
||||||
|
{influxdb_name, Name}
|
||||||
|
| Config
|
||||||
|
];
|
||||||
|
false ->
|
||||||
|
{skip, no_influxdb}
|
||||||
|
end;
|
||||||
|
init_per_group(sync_query, Config) ->
|
||||||
|
[{query_mode, sync} | Config];
|
||||||
|
init_per_group(async_query, Config) ->
|
||||||
|
[{query_mode, async} | Config];
|
||||||
|
init_per_group(with_batch, Config) ->
|
||||||
|
[{enable_batch, true} | Config];
|
||||||
|
init_per_group(without_batch, Config) ->
|
||||||
|
[{enable_batch, false} | Config];
|
||||||
|
init_per_group(_Group, Config) ->
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_group(Group, Config) when
|
||||||
|
Group =:= apiv1_tcp;
|
||||||
|
Group =:= apiv1_tls;
|
||||||
|
Group =:= apiv2_tcp;
|
||||||
|
Group =:= apiv2_tls
|
||||||
|
->
|
||||||
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
EHttpcPoolName = ?config(ehttpc_pool_name, Config),
|
||||||
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||||
|
ehttpc_sup:stop_pool(EHttpcPoolName),
|
||||||
|
delete_bridge(Config),
|
||||||
|
ok;
|
||||||
|
end_per_group(_Group, _Config) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
init_per_testcase(_Testcase, Config) ->
|
||||||
|
%% catch clear_db(Config),
|
||||||
|
%% delete_bridge(Config),
|
||||||
|
delete_all_bridges(),
|
||||||
|
Config.
|
||||||
|
|
||||||
|
end_per_testcase(_Testcase, Config) ->
|
||||||
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort),
|
||||||
|
%% catch clear_db(Config),
|
||||||
|
%% delete_bridge(Config),
|
||||||
|
delete_all_bridges(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Helper fns
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
example_write_syntax() ->
|
||||||
|
%% N.B.: this single space character is relevant
|
||||||
|
<<"${topic},clientid=${clientid}", " ", "payload=${payload},",
|
||||||
|
"${clientid}_int_value=${payload.int_key}i,",
|
||||||
|
"uint_value=${payload.uint_key}u,"
|
||||||
|
"float_value=${payload.float_key},", "undef_value=${payload.undef},",
|
||||||
|
"${undef_key}=\"hard-coded-value\",", "bool=${payload.bool}">>.
|
||||||
|
|
||||||
|
influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) ->
|
||||||
|
EnableBatch = proplists:get_value(enable_batch, Config, true),
|
||||||
|
QueryMode = proplists:get_value(query_mode, Config, sync),
|
||||||
|
UseTLS = proplists:get_value(use_tls, Config, false),
|
||||||
|
Name = atom_to_binary(?MODULE),
|
||||||
|
WriteSyntax = example_write_syntax(),
|
||||||
|
ConfigString =
|
||||||
|
io_lib:format(
|
||||||
|
"bridges.influxdb_api_v1.~s {\n"
|
||||||
|
" enable = true\n"
|
||||||
|
" server = \"~p:~b\"\n"
|
||||||
|
" database = mqtt\n"
|
||||||
|
" username = root\n"
|
||||||
|
" password = emqx@123\n"
|
||||||
|
" precision = ns\n"
|
||||||
|
" write_syntax = \"~s\"\n"
|
||||||
|
" resource_opts = {\n"
|
||||||
|
" enable_batch = ~p\n"
|
||||||
|
" query_mode = ~s\n"
|
||||||
|
" }\n"
|
||||||
|
" ssl {\n"
|
||||||
|
" enable = ~p\n"
|
||||||
|
" verify = verify_none\n"
|
||||||
|
" }\n"
|
||||||
|
"}\n",
|
||||||
|
[Name, InfluxDBHost, InfluxDBPort, WriteSyntax, EnableBatch, QueryMode, UseTLS]
|
||||||
|
),
|
||||||
|
{Name, ConfigString, parse_and_check(ConfigString, Type, Name)};
|
||||||
|
influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) ->
|
||||||
|
EnableBatch = proplists:get_value(enable_batch, Config, true),
|
||||||
|
QueryMode = proplists:get_value(query_mode, Config, sync),
|
||||||
|
UseTLS = proplists:get_value(use_tls, Config, false),
|
||||||
|
Name = atom_to_binary(?MODULE),
|
||||||
|
WriteSyntax = example_write_syntax(),
|
||||||
|
ConfigString =
|
||||||
|
io_lib:format(
|
||||||
|
"bridges.influxdb_api_v2.~s {\n"
|
||||||
|
" enable = true\n"
|
||||||
|
" server = \"~p:~b\"\n"
|
||||||
|
" bucket = mqtt\n"
|
||||||
|
" org = emqx\n"
|
||||||
|
" token = abcdefg\n"
|
||||||
|
" precision = ns\n"
|
||||||
|
" write_syntax = \"~s\"\n"
|
||||||
|
" resource_opts = {\n"
|
||||||
|
" enable_batch = ~p\n"
|
||||||
|
" query_mode = ~s\n"
|
||||||
|
" }\n"
|
||||||
|
" ssl {\n"
|
||||||
|
" enable = ~p\n"
|
||||||
|
" verify = verify_none\n"
|
||||||
|
" }\n"
|
||||||
|
"}\n",
|
||||||
|
[Name, InfluxDBHost, InfluxDBPort, WriteSyntax, EnableBatch, QueryMode, UseTLS]
|
||||||
|
),
|
||||||
|
{Name, ConfigString, parse_and_check(ConfigString, Type, Name)}.
|
||||||
|
|
||||||
|
parse_and_check(ConfigString, Type, Name) ->
|
||||||
|
{ok, RawConf} = hocon:binary(ConfigString, #{format => map}),
|
||||||
|
TypeBin = influxdb_type_bin(Type),
|
||||||
|
hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}),
|
||||||
|
#{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf,
|
||||||
|
Config.
|
||||||
|
|
||||||
|
influxdb_type_bin(apiv1) ->
|
||||||
|
<<"influxdb_api_v1">>;
|
||||||
|
influxdb_type_bin(apiv2) ->
|
||||||
|
<<"influxdb_api_v2">>.
|
||||||
|
|
||||||
|
create_bridge(Config) ->
|
||||||
|
Type = influxdb_type_bin(?config(influxdb_type, Config)),
|
||||||
|
Name = ?config(influxdb_name, Config),
|
||||||
|
InfluxDBConfig = ?config(influxdb_config, Config),
|
||||||
|
emqx_bridge:create(Type, Name, InfluxDBConfig).
|
||||||
|
|
||||||
|
delete_bridge(Config) ->
|
||||||
|
Type = influxdb_type_bin(?config(influxdb_type, Config)),
|
||||||
|
Name = ?config(influxdb_name, Config),
|
||||||
|
emqx_bridge:remove(Type, Name).
|
||||||
|
|
||||||
|
delete_all_bridges() ->
|
||||||
|
lists:foreach(
|
||||||
|
fun(#{name := Name, type := Type}) ->
|
||||||
|
emqx_bridge:remove(Type, Name)
|
||||||
|
end,
|
||||||
|
emqx_bridge:list()
|
||||||
|
).
|
||||||
|
|
||||||
|
send_message(Config, Payload) ->
|
||||||
|
Name = ?config(influxdb_name, Config),
|
||||||
|
Type = influxdb_type_bin(?config(influxdb_type, Config)),
|
||||||
|
BridgeId = emqx_bridge_resource:bridge_id(Type, Name),
|
||||||
|
emqx_bridge:send_message(BridgeId, Payload).
|
||||||
|
|
||||||
|
query_by_clientid(ClientId, Config) ->
|
||||||
|
InfluxDBHost = ?config(influxdb_host, Config),
|
||||||
|
InfluxDBPort = ?config(influxdb_port, Config),
|
||||||
|
EHttpcPoolName = ?config(ehttpc_pool_name, Config),
|
||||||
|
UseTLS = ?config(use_tls, Config),
|
||||||
|
Path = <<"/api/v2/query?org=emqx">>,
|
||||||
|
Scheme =
|
||||||
|
case UseTLS of
|
||||||
|
true -> <<"https://">>;
|
||||||
|
false -> <<"http://">>
|
||||||
|
end,
|
||||||
|
URI = iolist_to_binary([
|
||||||
|
Scheme,
|
||||||
|
list_to_binary(InfluxDBHost),
|
||||||
|
":",
|
||||||
|
integer_to_binary(InfluxDBPort),
|
||||||
|
Path
|
||||||
|
]),
|
||||||
|
Query =
|
||||||
|
<<
|
||||||
|
"from(bucket: \"mqtt\")\n"
|
||||||
|
" |> range(start: -12h)\n"
|
||||||
|
" |> filter(fn: (r) => r.clientid == \"",
|
||||||
|
ClientId/binary,
|
||||||
|
"\")"
|
||||||
|
>>,
|
||||||
|
Headers = [
|
||||||
|
{"Authorization", "Token abcdefg"},
|
||||||
|
{"Content-Type", "application/json"}
|
||||||
|
],
|
||||||
|
Body =
|
||||||
|
emqx_json:encode(#{
|
||||||
|
query => Query,
|
||||||
|
dialect => #{
|
||||||
|
header => true,
|
||||||
|
delimiter => <<";">>
|
||||||
|
}
|
||||||
|
}),
|
||||||
|
{ok, 200, _Headers, RawBody0} =
|
||||||
|
ehttpc:request(
|
||||||
|
EHttpcPoolName,
|
||||||
|
post,
|
||||||
|
{URI, Headers, Body},
|
||||||
|
_Timeout = 10_000,
|
||||||
|
_Retry = 0
|
||||||
|
),
|
||||||
|
RawBody1 = iolist_to_binary(string:replace(RawBody0, <<"\r\n">>, <<"\n">>, all)),
|
||||||
|
{ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}),
|
||||||
|
DecodedCSV1 = [
|
||||||
|
[Field || Field <- Line, Field =/= <<>>]
|
||||||
|
|| Line <- DecodedCSV0,
|
||||||
|
Line =/= [<<>>]
|
||||||
|
],
|
||||||
|
DecodedCSV2 = csv_lines_to_maps(DecodedCSV1, []),
|
||||||
|
index_by_field(DecodedCSV2).
|
||||||
|
|
||||||
|
decode_csv(RawBody) ->
|
||||||
|
Lines =
|
||||||
|
[
|
||||||
|
binary:split(Line, [<<";">>], [global, trim_all])
|
||||||
|
|| Line <- binary:split(RawBody, [<<"\r\n">>], [global, trim_all])
|
||||||
|
],
|
||||||
|
csv_lines_to_maps(Lines, []).
|
||||||
|
|
||||||
|
csv_lines_to_maps([Fields, Data | Rest], Acc) ->
|
||||||
|
Map = maps:from_list(lists:zip(Fields, Data)),
|
||||||
|
csv_lines_to_maps(Rest, [Map | Acc]);
|
||||||
|
csv_lines_to_maps(_Data, Acc) ->
|
||||||
|
lists:reverse(Acc).
|
||||||
|
|
||||||
|
index_by_field(DecodedCSV) ->
|
||||||
|
maps:from_list([{Field, Data} || Data = #{<<"_field">> := Field} <- DecodedCSV]).
|
||||||
|
|
||||||
|
assert_persisted_data(ClientId, Expected, PersistedData) ->
|
||||||
|
ClientIdIntKey = <<ClientId/binary, "_int_value">>,
|
||||||
|
maps:foreach(
|
||||||
|
fun
|
||||||
|
(int_value, ExpectedValue) ->
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"_value">> := ExpectedValue},
|
||||||
|
maps:get(ClientIdIntKey, PersistedData)
|
||||||
|
);
|
||||||
|
(Key, ExpectedValue) ->
|
||||||
|
?assertMatch(
|
||||||
|
#{<<"_value">> := ExpectedValue},
|
||||||
|
maps:get(atom_to_binary(Key), PersistedData),
|
||||||
|
#{expected => ExpectedValue}
|
||||||
|
)
|
||||||
|
end,
|
||||||
|
Expected
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
resource_id(Config) ->
|
||||||
|
Type = influxdb_type_bin(?config(influxdb_type, Config)),
|
||||||
|
Name = ?config(influxdb_name, Config),
|
||||||
|
emqx_bridge_resource:resource_id(Type, Name).
|
||||||
|
|
||||||
|
instance_id(Config) ->
|
||||||
|
ResourceId = resource_id(Config),
|
||||||
|
[{_, InstanceId}] = ets:lookup(emqx_resource_manager, {owner, ResourceId}),
|
||||||
|
InstanceId.
|
||||||
|
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
%% Testcases
|
||||||
|
%%------------------------------------------------------------------------------
|
||||||
|
|
||||||
|
t_start_ok(Config) ->
|
||||||
|
QueryMode = ?config(query_mode, Config),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
create_bridge(Config)
|
||||||
|
),
|
||||||
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
|
Payload = #{
|
||||||
|
int_key => -123,
|
||||||
|
bool => true,
|
||||||
|
float_key => 24.5,
|
||||||
|
uint_key => 123
|
||||||
|
},
|
||||||
|
SentData = #{
|
||||||
|
<<"clientid">> => ClientId,
|
||||||
|
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
|
||||||
|
<<"timestamp">> => erlang:system_time(nanosecond),
|
||||||
|
<<"payload">> => Payload
|
||||||
|
},
|
||||||
|
?check_trace(
|
||||||
|
begin
|
||||||
|
?assertEqual(ok, send_message(Config, SentData)),
|
||||||
|
case QueryMode of
|
||||||
|
async -> ct:sleep(500);
|
||||||
|
sync -> ok
|
||||||
|
end,
|
||||||
|
PersistedData = query_by_clientid(ClientId, Config),
|
||||||
|
Expected = #{
|
||||||
|
bool => <<"true">>,
|
||||||
|
int_value => <<"-123">>,
|
||||||
|
uint_value => <<"123">>,
|
||||||
|
float_value => <<"24.5">>,
|
||||||
|
payload => emqx_json:encode(Payload)
|
||||||
|
},
|
||||||
|
assert_persisted_data(ClientId, Expected, PersistedData),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
fun(Trace0) ->
|
||||||
|
Trace = ?of_kind(influxdb_connector_send_query, Trace0),
|
||||||
|
?assertMatch([#{points := [_]}], Trace),
|
||||||
|
[#{points := [Point]}] = Trace,
|
||||||
|
ct:pal("sent point: ~p", [Point]),
|
||||||
|
?assertMatch(
|
||||||
|
#{
|
||||||
|
fields := #{},
|
||||||
|
measurement := <<_/binary>>,
|
||||||
|
tags := #{},
|
||||||
|
timestamp := TS
|
||||||
|
} when is_integer(TS),
|
||||||
|
Point
|
||||||
|
),
|
||||||
|
#{fields := Fields} = Point,
|
||||||
|
?assert(lists:all(fun is_binary/1, maps:keys(Fields))),
|
||||||
|
?assertNot(maps:is_key(<<"undefined">>, Fields)),
|
||||||
|
?assertNot(maps:is_key(<<"undef_value">>, Fields)),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok = snabbkaffe:stop(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_start_already_started(Config) ->
|
||||||
|
Type = influxdb_type_bin(?config(influxdb_type, Config)),
|
||||||
|
Name = ?config(influxdb_name, Config),
|
||||||
|
InfluxDBConfigString = ?config(influxdb_config_string, Config),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
create_bridge(Config)
|
||||||
|
),
|
||||||
|
InstanceId = instance_id(Config),
|
||||||
|
TypeAtom = binary_to_atom(Type),
|
||||||
|
NameAtom = binary_to_atom(Name),
|
||||||
|
{ok, #{bridges := #{TypeAtom := #{NameAtom := InfluxDBConfigMap}}}} = emqx_hocon:check(
|
||||||
|
emqx_bridge_schema, InfluxDBConfigString
|
||||||
|
),
|
||||||
|
?check_trace(
|
||||||
|
emqx_ee_connector_influxdb:on_start(InstanceId, InfluxDBConfigMap),
|
||||||
|
fun(Result, Trace) ->
|
||||||
|
?assertMatch({ok, _}, Result),
|
||||||
|
?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok = snabbkaffe:stop(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_start_ok_timestamp_write_syntax(Config) ->
|
||||||
|
InfluxDBType = ?config(influxdb_type, Config),
|
||||||
|
InfluxDBName = ?config(influxdb_name, Config),
|
||||||
|
InfluxDBConfigString0 = ?config(influxdb_config_string, Config),
|
||||||
|
InfluxDBTypeCfg =
|
||||||
|
case InfluxDBType of
|
||||||
|
apiv1 -> "influxdb_api_v1";
|
||||||
|
apiv2 -> "influxdb_api_v2"
|
||||||
|
end,
|
||||||
|
WriteSyntax =
|
||||||
|
%% N.B.: this single space characters are relevant
|
||||||
|
<<"${topic},clientid=${clientid}", " ", "payload=${payload},",
|
||||||
|
"${clientid}_int_value=${payload.int_key}i,",
|
||||||
|
"uint_value=${payload.uint_key}u,"
|
||||||
|
"bool=${payload.bool}", " ", "${timestamp}">>,
|
||||||
|
%% append this to override the config
|
||||||
|
InfluxDBConfigString1 =
|
||||||
|
io_lib:format(
|
||||||
|
"bridges.~s.~s {\n"
|
||||||
|
" write_syntax = \"~s\"\n"
|
||||||
|
"}\n",
|
||||||
|
[InfluxDBTypeCfg, InfluxDBName, WriteSyntax]
|
||||||
|
),
|
||||||
|
InfluxDBConfig1 = parse_and_check(
|
||||||
|
InfluxDBConfigString0 ++ InfluxDBConfigString1,
|
||||||
|
InfluxDBType,
|
||||||
|
InfluxDBName
|
||||||
|
),
|
||||||
|
Config1 = [{influxdb_config, InfluxDBConfig1} | Config],
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
create_bridge(Config1)
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_start_ok_no_subject_tags_write_syntax(Config) ->
|
||||||
|
InfluxDBType = ?config(influxdb_type, Config),
|
||||||
|
InfluxDBName = ?config(influxdb_name, Config),
|
||||||
|
InfluxDBConfigString0 = ?config(influxdb_config_string, Config),
|
||||||
|
InfluxDBTypeCfg =
|
||||||
|
case InfluxDBType of
|
||||||
|
apiv1 -> "influxdb_api_v1";
|
||||||
|
apiv2 -> "influxdb_api_v2"
|
||||||
|
end,
|
||||||
|
WriteSyntax =
|
||||||
|
%% N.B.: this single space characters are relevant
|
||||||
|
<<"${topic}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,",
|
||||||
|
"uint_value=${payload.uint_key}u,"
|
||||||
|
"bool=${payload.bool}", " ", "${timestamp}">>,
|
||||||
|
%% append this to override the config
|
||||||
|
InfluxDBConfigString1 =
|
||||||
|
io_lib:format(
|
||||||
|
"bridges.~s.~s {\n"
|
||||||
|
" write_syntax = \"~s\"\n"
|
||||||
|
"}\n",
|
||||||
|
[InfluxDBTypeCfg, InfluxDBName, WriteSyntax]
|
||||||
|
),
|
||||||
|
InfluxDBConfig1 = parse_and_check(
|
||||||
|
InfluxDBConfigString0 ++ InfluxDBConfigString1,
|
||||||
|
InfluxDBType,
|
||||||
|
InfluxDBName
|
||||||
|
),
|
||||||
|
Config1 = [{influxdb_config, InfluxDBConfig1} | Config],
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
create_bridge(Config1)
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_boolean_variants(Config) ->
|
||||||
|
QueryMode = ?config(query_mode, Config),
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
create_bridge(Config)
|
||||||
|
),
|
||||||
|
BoolVariants = #{
|
||||||
|
true => true,
|
||||||
|
false => false,
|
||||||
|
<<"t">> => true,
|
||||||
|
<<"f">> => false,
|
||||||
|
<<"T">> => true,
|
||||||
|
<<"F">> => false,
|
||||||
|
<<"TRUE">> => true,
|
||||||
|
<<"FALSE">> => false,
|
||||||
|
<<"True">> => true,
|
||||||
|
<<"False">> => false
|
||||||
|
},
|
||||||
|
maps:foreach(
|
||||||
|
fun(BoolVariant, Translation) ->
|
||||||
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
|
Payload = #{
|
||||||
|
int_key => -123,
|
||||||
|
bool => BoolVariant,
|
||||||
|
uint_key => 123
|
||||||
|
},
|
||||||
|
SentData = #{
|
||||||
|
<<"clientid">> => ClientId,
|
||||||
|
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
|
||||||
|
<<"timestamp">> => erlang:system_time(nanosecond),
|
||||||
|
<<"payload">> => Payload
|
||||||
|
},
|
||||||
|
?assertEqual(ok, send_message(Config, SentData)),
|
||||||
|
case QueryMode of
|
||||||
|
async -> ct:sleep(500);
|
||||||
|
sync -> ok
|
||||||
|
end,
|
||||||
|
PersistedData = query_by_clientid(ClientId, Config),
|
||||||
|
Expected = #{
|
||||||
|
bool => atom_to_binary(Translation),
|
||||||
|
int_value => <<"-123">>,
|
||||||
|
uint_value => <<"123">>,
|
||||||
|
payload => emqx_json:encode(Payload)
|
||||||
|
},
|
||||||
|
assert_persisted_data(ClientId, Expected, PersistedData),
|
||||||
|
ok
|
||||||
|
end,
|
||||||
|
BoolVariants
|
||||||
|
),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_bad_timestamp(Config) ->
|
||||||
|
InfluxDBType = ?config(influxdb_type, Config),
|
||||||
|
InfluxDBName = ?config(influxdb_name, Config),
|
||||||
|
QueryMode = ?config(query_mode, Config),
|
||||||
|
EnableBatch = ?config(enable_batch, Config),
|
||||||
|
InfluxDBConfigString0 = ?config(influxdb_config_string, Config),
|
||||||
|
InfluxDBTypeCfg =
|
||||||
|
case InfluxDBType of
|
||||||
|
apiv1 -> "influxdb_api_v1";
|
||||||
|
apiv2 -> "influxdb_api_v2"
|
||||||
|
end,
|
||||||
|
WriteSyntax =
|
||||||
|
%% N.B.: this single space characters are relevant
|
||||||
|
<<"${topic}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,",
|
||||||
|
"uint_value=${payload.uint_key}u,"
|
||||||
|
"bool=${payload.bool}", " ", "bad_timestamp">>,
|
||||||
|
%% append this to override the config
|
||||||
|
InfluxDBConfigString1 =
|
||||||
|
io_lib:format(
|
||||||
|
"bridges.~s.~s {\n"
|
||||||
|
" write_syntax = \"~s\"\n"
|
||||||
|
"}\n",
|
||||||
|
[InfluxDBTypeCfg, InfluxDBName, WriteSyntax]
|
||||||
|
),
|
||||||
|
InfluxDBConfig1 = parse_and_check(
|
||||||
|
InfluxDBConfigString0 ++ InfluxDBConfigString1,
|
||||||
|
InfluxDBType,
|
||||||
|
InfluxDBName
|
||||||
|
),
|
||||||
|
Config1 = [{influxdb_config, InfluxDBConfig1} | Config],
|
||||||
|
?assertMatch(
|
||||||
|
{ok, _},
|
||||||
|
create_bridge(Config1)
|
||||||
|
),
|
||||||
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
|
Payload = #{
|
||||||
|
int_key => -123,
|
||||||
|
bool => false,
|
||||||
|
uint_key => 123
|
||||||
|
},
|
||||||
|
SentData = #{
|
||||||
|
<<"clientid">> => ClientId,
|
||||||
|
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
|
||||||
|
<<"timestamp">> => erlang:system_time(nanosecond),
|
||||||
|
<<"payload">> => Payload
|
||||||
|
},
|
||||||
|
?check_trace(
|
||||||
|
?wait_async_action(
|
||||||
|
send_message(Config1, SentData),
|
||||||
|
#{?snk_kind := influxdb_connector_send_query_error},
|
||||||
|
10_000
|
||||||
|
),
|
||||||
|
fun(Result, Trace) ->
|
||||||
|
?assertMatch({_, {ok, _}}, Result),
|
||||||
|
{Return, {ok, _}} = Result,
|
||||||
|
case {QueryMode, EnableBatch} of
|
||||||
|
{async, true} ->
|
||||||
|
?assertEqual(ok, Return),
|
||||||
|
?assertMatch(
|
||||||
|
[#{error := points_trans_failed}],
|
||||||
|
?of_kind(influxdb_connector_send_query_error, Trace)
|
||||||
|
);
|
||||||
|
{async, false} ->
|
||||||
|
?assertEqual(ok, Return),
|
||||||
|
?assertMatch(
|
||||||
|
[#{error := [{error, {bad_timestamp, [<<"bad_timestamp">>]}}]}],
|
||||||
|
?of_kind(influxdb_connector_send_query_error, Trace)
|
||||||
|
);
|
||||||
|
{sync, false} ->
|
||||||
|
?assertEqual(
|
||||||
|
{error, [{error, {bad_timestamp, [<<"bad_timestamp">>]}}]}, Return
|
||||||
|
);
|
||||||
|
{sync, true} ->
|
||||||
|
?assertEqual({error, points_trans_failed}, Return)
|
||||||
|
end,
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok = snabbkaffe:stop(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_get_status(Config) ->
|
||||||
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
ProxyName = ?config(proxy_name, Config),
|
||||||
|
{ok, _} = create_bridge(Config),
|
||||||
|
ResourceId = resource_id(Config),
|
||||||
|
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)),
|
||||||
|
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
||||||
|
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId))
|
||||||
|
end),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_create_disconnected(Config) ->
|
||||||
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
ProxyName = ?config(proxy_name, Config),
|
||||||
|
?check_trace(
|
||||||
|
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
||||||
|
?assertMatch({ok, _}, create_bridge(Config))
|
||||||
|
end),
|
||||||
|
fun(Trace) ->
|
||||||
|
?assertMatch(
|
||||||
|
[#{error := influxdb_client_not_alive}],
|
||||||
|
?of_kind(influxdb_connector_start_failed, Trace)
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok = snabbkaffe:stop(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_start_error(Config) ->
|
||||||
|
%% simulate client start error
|
||||||
|
?check_trace(
|
||||||
|
emqx_common_test_helpers:with_mock(
|
||||||
|
influxdb,
|
||||||
|
start_client,
|
||||||
|
fun(_Config) -> {error, some_error} end,
|
||||||
|
fun() ->
|
||||||
|
?wait_async_action(
|
||||||
|
?assertMatch({ok, _}, create_bridge(Config)),
|
||||||
|
#{?snk_kind := influxdb_connector_start_failed},
|
||||||
|
10_000
|
||||||
|
)
|
||||||
|
end
|
||||||
|
),
|
||||||
|
fun(Trace) ->
|
||||||
|
?assertMatch(
|
||||||
|
[#{error := some_error}],
|
||||||
|
?of_kind(influxdb_connector_start_failed, Trace)
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok = snabbkaffe:stop(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_start_exception(Config) ->
|
||||||
|
%% simulate client start exception
|
||||||
|
?check_trace(
|
||||||
|
emqx_common_test_helpers:with_mock(
|
||||||
|
influxdb,
|
||||||
|
start_client,
|
||||||
|
fun(_Config) -> error(boom) end,
|
||||||
|
fun() ->
|
||||||
|
?wait_async_action(
|
||||||
|
?assertMatch({ok, _}, create_bridge(Config)),
|
||||||
|
#{?snk_kind := influxdb_connector_start_exception},
|
||||||
|
10_000
|
||||||
|
)
|
||||||
|
end
|
||||||
|
),
|
||||||
|
fun(Trace) ->
|
||||||
|
?assertMatch(
|
||||||
|
[#{error := {error, boom}}],
|
||||||
|
?of_kind(influxdb_connector_start_exception, Trace)
|
||||||
|
),
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok = snabbkaffe:stop(),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
t_write_failure(Config) ->
|
||||||
|
ProxyName = ?config(proxy_name, Config),
|
||||||
|
ProxyPort = ?config(proxy_port, Config),
|
||||||
|
ProxyHost = ?config(proxy_host, Config),
|
||||||
|
QueryMode = ?config(query_mode, Config),
|
||||||
|
{ok, _} = create_bridge(Config),
|
||||||
|
ClientId = emqx_guid:to_hexstr(emqx_guid:gen()),
|
||||||
|
Payload = #{
|
||||||
|
int_key => -123,
|
||||||
|
bool => true,
|
||||||
|
float_key => 24.5,
|
||||||
|
uint_key => 123
|
||||||
|
},
|
||||||
|
SentData = #{
|
||||||
|
<<"clientid">> => ClientId,
|
||||||
|
<<"topic">> => atom_to_binary(?FUNCTION_NAME),
|
||||||
|
<<"timestamp">> => erlang:system_time(nanosecond),
|
||||||
|
<<"payload">> => Payload
|
||||||
|
},
|
||||||
|
?check_trace(
|
||||||
|
emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() ->
|
||||||
|
send_message(Config, SentData)
|
||||||
|
end),
|
||||||
|
fun(Result, _Trace) ->
|
||||||
|
case QueryMode of
|
||||||
|
sync ->
|
||||||
|
?assert(
|
||||||
|
{error, {error, {closed, "The connection was lost."}}} =:= Result orelse
|
||||||
|
{error, {error, closed}} =:= Result orelse
|
||||||
|
{error, {error, econnrefused}} =:= Result,
|
||||||
|
#{got => Result}
|
||||||
|
);
|
||||||
|
async ->
|
||||||
|
?assertEqual(ok, Result)
|
||||||
|
end,
|
||||||
|
ok
|
||||||
|
end
|
||||||
|
),
|
||||||
|
ok = snabbkaffe:stop(),
|
||||||
|
ok.
|
|
@ -9,6 +9,7 @@
|
||||||
-include_lib("hocon/include/hoconsc.hrl").
|
-include_lib("hocon/include/hoconsc.hrl").
|
||||||
-include_lib("typerefl/include/types.hrl").
|
-include_lib("typerefl/include/types.hrl").
|
||||||
-include_lib("emqx/include/logger.hrl").
|
-include_lib("emqx/include/logger.hrl").
|
||||||
|
-include_lib("snabbkaffe/include/snabbkaffe.hrl").
|
||||||
|
|
||||||
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
-import(hoconsc, [mk/2, enum/1, ref/2]).
|
||||||
|
|
||||||
|
@ -51,8 +52,16 @@ on_stop(_InstId, #{client := Client}) ->
|
||||||
on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) ->
|
on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) ->
|
||||||
case data_to_points(Data, SyntaxLines) of
|
case data_to_points(Data, SyntaxLines) of
|
||||||
{ok, Points} ->
|
{ok, Points} ->
|
||||||
|
?tp(
|
||||||
|
influxdb_connector_send_query,
|
||||||
|
#{points => Points, batch => false, mode => sync}
|
||||||
|
),
|
||||||
do_query(InstId, Client, Points);
|
do_query(InstId, Client, Points);
|
||||||
{error, ErrorPoints} = Err ->
|
{error, ErrorPoints} = Err ->
|
||||||
|
?tp(
|
||||||
|
influxdb_connector_send_query_error,
|
||||||
|
#{batch => false, mode => sync, error => ErrorPoints}
|
||||||
|
),
|
||||||
log_error_points(InstId, ErrorPoints),
|
log_error_points(InstId, ErrorPoints),
|
||||||
Err
|
Err
|
||||||
end.
|
end.
|
||||||
|
@ -62,8 +71,16 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c
|
||||||
on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) ->
|
on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) ->
|
||||||
case parse_batch_data(InstId, BatchData, SyntaxLines) of
|
case parse_batch_data(InstId, BatchData, SyntaxLines) of
|
||||||
{ok, Points} ->
|
{ok, Points} ->
|
||||||
|
?tp(
|
||||||
|
influxdb_connector_send_query,
|
||||||
|
#{points => Points, batch => true, mode => sync}
|
||||||
|
),
|
||||||
do_query(InstId, Client, Points);
|
do_query(InstId, Client, Points);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
?tp(
|
||||||
|
influxdb_connector_send_query_error,
|
||||||
|
#{batch => true, mode => sync, error => Reason}
|
||||||
|
),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -75,8 +92,16 @@ on_query_async(
|
||||||
) ->
|
) ->
|
||||||
case data_to_points(Data, SyntaxLines) of
|
case data_to_points(Data, SyntaxLines) of
|
||||||
{ok, Points} ->
|
{ok, Points} ->
|
||||||
|
?tp(
|
||||||
|
influxdb_connector_send_query,
|
||||||
|
#{points => Points, batch => false, mode => async}
|
||||||
|
),
|
||||||
do_async_query(InstId, Client, Points, {ReplayFun, Args});
|
do_async_query(InstId, Client, Points, {ReplayFun, Args});
|
||||||
{error, ErrorPoints} = Err ->
|
{error, ErrorPoints} = Err ->
|
||||||
|
?tp(
|
||||||
|
influxdb_connector_send_query_error,
|
||||||
|
#{batch => false, mode => async, error => ErrorPoints}
|
||||||
|
),
|
||||||
log_error_points(InstId, ErrorPoints),
|
log_error_points(InstId, ErrorPoints),
|
||||||
Err
|
Err
|
||||||
end.
|
end.
|
||||||
|
@ -89,8 +114,16 @@ on_batch_query_async(
|
||||||
) ->
|
) ->
|
||||||
case parse_batch_data(InstId, BatchData, SyntaxLines) of
|
case parse_batch_data(InstId, BatchData, SyntaxLines) of
|
||||||
{ok, Points} ->
|
{ok, Points} ->
|
||||||
|
?tp(
|
||||||
|
influxdb_connector_send_query,
|
||||||
|
#{points => Points, batch => true, mode => async}
|
||||||
|
),
|
||||||
do_async_query(InstId, Client, Points, {ReplayFun, Args});
|
do_async_query(InstId, Client, Points, {ReplayFun, Args});
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
?tp(
|
||||||
|
influxdb_connector_send_query_error,
|
||||||
|
#{batch => true, mode => async, error => Reason}
|
||||||
|
),
|
||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -163,6 +196,7 @@ start_client(InstId, Config) ->
|
||||||
do_start_client(InstId, ClientConfig, Config)
|
do_start_client(InstId, ClientConfig, Config)
|
||||||
catch
|
catch
|
||||||
E:R:S ->
|
E:R:S ->
|
||||||
|
?tp(influxdb_connector_start_exception, #{error => {E, R}}),
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "start influxdb connector error",
|
msg => "start influxdb connector error",
|
||||||
connector => InstId,
|
connector => InstId,
|
||||||
|
@ -196,6 +230,7 @@ do_start_client(
|
||||||
}),
|
}),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
false ->
|
false ->
|
||||||
|
?tp(influxdb_connector_start_failed, #{error => influxdb_client_not_alive}),
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "starting influxdb connector failed",
|
msg => "starting influxdb connector failed",
|
||||||
connector => InstId,
|
connector => InstId,
|
||||||
|
@ -205,14 +240,16 @@ do_start_client(
|
||||||
{error, influxdb_client_not_alive}
|
{error, influxdb_client_not_alive}
|
||||||
end;
|
end;
|
||||||
{error, {already_started, Client0}} ->
|
{error, {already_started, Client0}} ->
|
||||||
|
?tp(influxdb_connector_start_already_started, #{}),
|
||||||
?SLOG(info, #{
|
?SLOG(info, #{
|
||||||
msg => "starting influxdb connector,find already started client",
|
msg => "restarting influxdb connector, found already started client",
|
||||||
connector => InstId,
|
connector => InstId,
|
||||||
old_client => Client0
|
old_client => Client0
|
||||||
}),
|
}),
|
||||||
_ = influxdb:stop_client(Client0),
|
_ = influxdb:stop_client(Client0),
|
||||||
do_start_client(InstId, ClientConfig, Config);
|
do_start_client(InstId, ClientConfig, Config);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
|
?tp(influxdb_connector_start_failed, #{error => Reason}),
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "starting influxdb connector failed",
|
msg => "starting influxdb connector failed",
|
||||||
connector => InstId,
|
connector => InstId,
|
||||||
|
@ -235,7 +272,7 @@ client_config(
|
||||||
{precision, atom_to_binary(maps:get(precision, Config, ms), utf8)}
|
{precision, atom_to_binary(maps:get(precision, Config, ms), utf8)}
|
||||||
] ++ protocol_config(Config).
|
] ++ protocol_config(Config).
|
||||||
|
|
||||||
%% api v2 config
|
%% api v1 config
|
||||||
protocol_config(#{
|
protocol_config(#{
|
||||||
username := Username,
|
username := Username,
|
||||||
password := Password,
|
password := Password,
|
||||||
|
@ -249,7 +286,7 @@ protocol_config(#{
|
||||||
{password, str(Password)},
|
{password, str(Password)},
|
||||||
{database, str(DB)}
|
{database, str(DB)}
|
||||||
] ++ ssl_config(SSL);
|
] ++ ssl_config(SSL);
|
||||||
%% api v1 config
|
%% api v2 config
|
||||||
protocol_config(#{
|
protocol_config(#{
|
||||||
bucket := Bucket,
|
bucket := Bucket,
|
||||||
org := Org,
|
org := Org,
|
||||||
|
@ -290,6 +327,7 @@ do_query(InstId, Client, Points) ->
|
||||||
points => Points
|
points => Points
|
||||||
});
|
});
|
||||||
{error, Reason} = Err ->
|
{error, Reason} = Err ->
|
||||||
|
?tp(influxdb_connector_do_query_failure, #{error => Reason}),
|
||||||
?SLOG(error, #{
|
?SLOG(error, #{
|
||||||
msg => "influxdb write point failed",
|
msg => "influxdb write point failed",
|
||||||
connector => InstId,
|
connector => InstId,
|
||||||
|
@ -370,6 +408,14 @@ parse_batch_data(InstId, BatchData, SyntaxLines) ->
|
||||||
{error, points_trans_failed}
|
{error, points_trans_failed}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec data_to_points(map(), [
|
||||||
|
#{
|
||||||
|
fields := [{binary(), binary()}],
|
||||||
|
measurement := binary(),
|
||||||
|
tags := [{binary(), binary()}],
|
||||||
|
timestamp := binary()
|
||||||
|
}
|
||||||
|
]) -> {ok, [map()]} | {error, term()}.
|
||||||
data_to_points(Data, SyntaxLines) ->
|
data_to_points(Data, SyntaxLines) ->
|
||||||
lines_to_points(Data, SyntaxLines, [], []).
|
lines_to_points(Data, SyntaxLines, [], []).
|
||||||
|
|
||||||
|
@ -416,17 +462,18 @@ lines_to_points(
|
||||||
end.
|
end.
|
||||||
|
|
||||||
maps_config_to_data(K, V, {Data, Res}) ->
|
maps_config_to_data(K, V, {Data, Res}) ->
|
||||||
KTransOptions = #{return => full_binary},
|
KTransOptions = #{return => rawlist, var_trans => fun key_filter/1},
|
||||||
VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
VTransOptions = #{return => rawlist, var_trans => fun data_filter/1},
|
||||||
NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions),
|
NK0 = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions),
|
||||||
NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, VTransOptions),
|
NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, VTransOptions),
|
||||||
case {NK, NV} of
|
case {NK0, NV} of
|
||||||
{[undefined], _} ->
|
{[undefined], _} ->
|
||||||
{Data, Res};
|
{Data, Res};
|
||||||
%% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>]
|
%% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>]
|
||||||
{_, [undefined | _]} ->
|
{_, [undefined | _]} ->
|
||||||
{Data, Res};
|
{Data, Res};
|
||||||
_ ->
|
_ ->
|
||||||
|
NK = list_to_binary(NK0),
|
||||||
{Data, Res#{NK => value_type(NV)}}
|
{Data, Res#{NK => value_type(NV)}}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
@ -438,6 +485,8 @@ value_type([UInt, <<"u">>]) when
|
||||||
is_integer(UInt)
|
is_integer(UInt)
|
||||||
->
|
->
|
||||||
{uint, UInt};
|
{uint, UInt};
|
||||||
|
value_type([Float]) when is_float(Float) ->
|
||||||
|
Float;
|
||||||
value_type([<<"t">>]) ->
|
value_type([<<"t">>]) ->
|
||||||
't';
|
't';
|
||||||
value_type([<<"T">>]) ->
|
value_type([<<"T">>]) ->
|
||||||
|
@ -461,6 +510,9 @@ value_type([<<"False">>]) ->
|
||||||
value_type(Val) ->
|
value_type(Val) ->
|
||||||
Val.
|
Val.
|
||||||
|
|
||||||
|
key_filter(undefined) -> undefined;
|
||||||
|
key_filter(Value) -> emqx_plugin_libs_rule:bin(Value).
|
||||||
|
|
||||||
data_filter(undefined) -> undefined;
|
data_filter(undefined) -> undefined;
|
||||||
data_filter(Int) when is_integer(Int) -> Int;
|
data_filter(Int) when is_integer(Int) -> Int;
|
||||||
data_filter(Number) when is_number(Number) -> Number;
|
data_filter(Number) when is_number(Number) -> Number;
|
||||||
|
@ -500,3 +552,56 @@ str(B) when is_binary(B) ->
|
||||||
binary_to_list(B);
|
binary_to_list(B);
|
||||||
str(S) when is_list(S) ->
|
str(S) when is_list(S) ->
|
||||||
S.
|
S.
|
||||||
|
|
||||||
|
%%===================================================================
|
||||||
|
%% eunit tests
|
||||||
|
%%===================================================================
|
||||||
|
|
||||||
|
-ifdef(TEST).
|
||||||
|
-include_lib("eunit/include/eunit.hrl").
|
||||||
|
|
||||||
|
to_server_raw_test_() ->
|
||||||
|
[
|
||||||
|
?_assertEqual(
|
||||||
|
{"foobar", 1234},
|
||||||
|
to_server_raw(<<"http://foobar:1234">>)
|
||||||
|
),
|
||||||
|
?_assertEqual(
|
||||||
|
{"foobar", 1234},
|
||||||
|
to_server_raw(<<"https://foobar:1234">>)
|
||||||
|
),
|
||||||
|
?_assertEqual(
|
||||||
|
{"foobar", 1234},
|
||||||
|
to_server_raw(<<"foobar:1234">>)
|
||||||
|
)
|
||||||
|
].
|
||||||
|
|
||||||
|
%% for coverage
|
||||||
|
desc_test_() ->
|
||||||
|
[
|
||||||
|
?_assertMatch(
|
||||||
|
{desc, _, _},
|
||||||
|
desc(common)
|
||||||
|
),
|
||||||
|
?_assertMatch(
|
||||||
|
{desc, _, _},
|
||||||
|
desc(influxdb_udp)
|
||||||
|
),
|
||||||
|
?_assertMatch(
|
||||||
|
{desc, _, _},
|
||||||
|
desc(influxdb_api_v1)
|
||||||
|
),
|
||||||
|
?_assertMatch(
|
||||||
|
{desc, _, _},
|
||||||
|
desc(influxdb_api_v2)
|
||||||
|
),
|
||||||
|
?_assertMatch(
|
||||||
|
{desc, _, _},
|
||||||
|
server(desc)
|
||||||
|
),
|
||||||
|
?_assertMatch(
|
||||||
|
connector_influxdb,
|
||||||
|
namespace()
|
||||||
|
)
|
||||||
|
].
|
||||||
|
-endif.
|
||||||
|
|
|
@ -150,7 +150,8 @@ test_deps() ->
|
||||||
{bbmustache, "1.10.0"},
|
{bbmustache, "1.10.0"},
|
||||||
{meck, "0.9.2"},
|
{meck, "0.9.2"},
|
||||||
{proper, "1.4.0"},
|
{proper, "1.4.0"},
|
||||||
{er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0.5"}}}
|
{er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0.5"}}},
|
||||||
|
{erl_csv, "0.2.0"}
|
||||||
].
|
].
|
||||||
|
|
||||||
common_compile_opts(Vsn) ->
|
common_compile_opts(Vsn) ->
|
||||||
|
|
|
@ -92,6 +92,11 @@ for dep in ${CT_DEPS}; do
|
||||||
erlang24)
|
erlang24)
|
||||||
FILES+=( '.ci/docker-compose-file/docker-compose.yaml' )
|
FILES+=( '.ci/docker-compose-file/docker-compose.yaml' )
|
||||||
;;
|
;;
|
||||||
|
influxdb)
|
||||||
|
FILES+=( '.ci/docker-compose-file/docker-compose-toxiproxy.yaml'
|
||||||
|
'.ci/docker-compose-file/docker-compose-influxdb-tcp.yaml'
|
||||||
|
'.ci/docker-compose-file/docker-compose-influxdb-tls.yaml' )
|
||||||
|
;;
|
||||||
mongo)
|
mongo)
|
||||||
FILES+=( '.ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml'
|
FILES+=( '.ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml'
|
||||||
'.ci/docker-compose-file/docker-compose-mongo-single-tls.yaml' )
|
'.ci/docker-compose-file/docker-compose-mongo-single-tls.yaml' )
|
||||||
|
|
Loading…
Reference in New Issue