diff --git a/.ci/docker-compose-file/docker-compose-influxdb-tcp.yaml b/.ci/docker-compose-file/docker-compose-influxdb-tcp.yaml new file mode 100644 index 000000000..1780bc7e2 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-influxdb-tcp.yaml @@ -0,0 +1,36 @@ +version: '3.9' + +services: + influxdb_server_tcp: + container_name: influxdb_tcp + image: influxdb:${INFLUXDB_TAG} + expose: + - "8086" + - "8089/udp" + - "8083" + # ports: + # - "8086:8086" + environment: + DOCKER_INFLUXDB_INIT_MODE: setup + DOCKER_INFLUXDB_INIT_USERNAME: root + DOCKER_INFLUXDB_INIT_PASSWORD: emqx@123 + DOCKER_INFLUXDB_INIT_ORG: emqx + DOCKER_INFLUXDB_INIT_BUCKET: mqtt + DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: abcdefg + volumes: + - "./influxdb/setup-v1.sh:/docker-entrypoint-initdb.d/setup-v1.sh" + restart: always + networks: + - emqx_bridge + +# networks: +# emqx_bridge: +# driver: bridge +# name: emqx_bridge +# ipam: +# driver: default +# config: +# - subnet: 172.100.239.0/24 +# gateway: 172.100.239.1 +# - subnet: 2001:3200:3200::/64 +# gateway: 2001:3200:3200::1 diff --git a/.ci/docker-compose-file/docker-compose-influxdb-tls.yaml b/.ci/docker-compose-file/docker-compose-influxdb-tls.yaml new file mode 100644 index 000000000..ec1600bf2 --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-influxdb-tls.yaml @@ -0,0 +1,42 @@ +version: '3.9' + +services: + influxdb_server_tls: + container_name: influxdb_tls + image: influxdb:${INFLUXDB_TAG} + expose: + - "8086" + - "8089/udp" + - "8083" + # ports: + # - "8087:8086" + environment: + DOCKER_INFLUXDB_INIT_MODE: setup + DOCKER_INFLUXDB_INIT_USERNAME: root + DOCKER_INFLUXDB_INIT_PASSWORD: emqx@123 + DOCKER_INFLUXDB_INIT_ORG: emqx + DOCKER_INFLUXDB_INIT_BUCKET: mqtt + DOCKER_INFLUXDB_INIT_ADMIN_TOKEN: abcdefg + volumes: + - ./certs/server.crt:/etc/influxdb/cert.pem + - ./certs/server.key:/etc/influxdb/key.pem + - "./influxdb/setup-v1.sh:/docker-entrypoint-initdb.d/setup-v1.sh" + command: + - influxd + - --tls-cert=/etc/influxdb/cert.pem + - --tls-key=/etc/influxdb/key.pem + restart: always + networks: + - emqx_bridge + +# networks: +# emqx_bridge: +# driver: bridge +# name: emqx_bridge +# ipam: +# driver: default +# config: +# - subnet: 172.100.239.0/24 +# gateway: 172.100.239.1 +# - subnet: 2001:3200:3200::/64 +# gateway: 2001:3200:3200::1 diff --git a/.ci/docker-compose-file/docker-compose-toxiproxy.yaml b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml new file mode 100644 index 000000000..924c9e6ae --- /dev/null +++ b/.ci/docker-compose-file/docker-compose-toxiproxy.yaml @@ -0,0 +1,18 @@ +version: '3.9' + +services: + toxiproxy: + container_name: toxiproxy + image: ghcr.io/shopify/toxiproxy:2.5.0 + restart: always + networks: + - emqx_bridge + volumes: + - "./toxiproxy.json:/config/toxiproxy.json" + ports: + - 8474:8474 + - 8086:8086 + - 8087:8087 + command: + - "-host=0.0.0.0" + - "-config=/config/toxiproxy.json" diff --git a/.ci/docker-compose-file/influxdb/setup-v1.sh b/.ci/docker-compose-file/influxdb/setup-v1.sh new file mode 100755 index 000000000..92baf9905 --- /dev/null +++ b/.ci/docker-compose-file/influxdb/setup-v1.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash + +set -e + +# influx v1 dbrp create \ +# --bucket-id ${DOCKER_INFLUXDB_INIT_BUCKET_ID} \ +# --db ${V1_DB_NAME} \ +# --rp ${V1_RP_NAME} \ +# --default \ +# --org ${DOCKER_INFLUXDB_INIT_ORG} + +influx v1 auth create \ + --username "${DOCKER_INFLUXDB_INIT_USERNAME}" \ + --password "${DOCKER_INFLUXDB_INIT_PASSWORD}" \ + --write-bucket "${DOCKER_INFLUXDB_INIT_BUCKET_ID}" \ + --org "${DOCKER_INFLUXDB_INIT_ORG}" diff --git a/.ci/docker-compose-file/toxiproxy.json b/.ci/docker-compose-file/toxiproxy.json new file mode 100644 index 000000000..176b35d36 --- /dev/null +++ b/.ci/docker-compose-file/toxiproxy.json @@ -0,0 +1,14 @@ +[ + { + "name": "influxdb_tcp", + "listen": "0.0.0.0:8086", + "upstream": "influxdb_tcp:8086", + "enabled": true + }, + { + "name": "influxdb_tls", + "listen": "0.0.0.0:8087", + "upstream": "influxdb_tls:8086", + "enabled": true + } +] diff --git a/.github/workflows/run_test_cases.yaml b/.github/workflows/run_test_cases.yaml index 540383ed6..89480f3e5 100644 --- a/.github/workflows/run_test_cases.yaml +++ b/.github/workflows/run_test_cases.yaml @@ -120,6 +120,7 @@ jobs: MYSQL_TAG: 8 PGSQL_TAG: 13 REDIS_TAG: 6 + INFLUXDB_TAG: 2.5.0 run: | ./scripts/ct/run.sh --app ${{ matrix.app_name }} - uses: actions/upload-artifact@v1 diff --git a/Makefile b/Makefile index 3dd11aec3..d06becc5d 100644 --- a/Makefile +++ b/Makefile @@ -83,7 +83,11 @@ $(foreach app,$(APPS),$(eval $(call gen-app-prop-target,$(app)))) .PHONY: ct-suite ct-suite: $(REBAR) ifneq ($(TESTCASE),) +ifneq ($(GROUP),) + $(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE) --case $(TESTCASE) --group $(GROUP) +else $(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE) --case $(TESTCASE) +endif else ifneq ($(GROUP),) $(REBAR) ct -v --readable=$(CT_READABLE) --name $(CT_NODE_NAME) --suite $(SUITE) --group $(GROUP) else diff --git a/apps/emqx/test/emqx_common_test_helpers.erl b/apps/emqx/test/emqx_common_test_helpers.erl index 8c0ed2377..196546b7b 100644 --- a/apps/emqx/test/emqx_common_test_helpers.erl +++ b/apps/emqx/test/emqx_common_test_helpers.erl @@ -65,6 +65,15 @@ stop_slave/1 ]). +-export([clear_screen/0]). +-export([with_mock/4]). + +%% Toxiproxy API +-export([ + with_failure/5, + reset_proxy/2 +]). + -define(CERTS_PATH(CertName), filename:join(["etc", "certs", CertName])). -define(MQTT_SSL_TWOWAY, [ @@ -769,3 +778,129 @@ expand_node_specs(Specs, CommonOpts) -> end, Specs ). + +%% is useful when iterating on the tests in a loop, to get rid of all +%% the garbaged printed before the test itself beings. +clear_screen() -> + io:format(standard_io, "\033[H\033[2J", []), + io:format(standard_error, "\033[H\033[2J", []), + io:format(standard_io, "\033[H\033[3J", []), + io:format(standard_error, "\033[H\033[3J", []), + ok. + +with_mock(Mod, FnName, MockedFn, Fun) -> + ok = meck:new(Mod, [non_strict, no_link, no_history, passthrough]), + ok = meck:expect(Mod, FnName, MockedFn), + try + Fun() + after + ok = meck:unload(Mod) + end. + +%%------------------------------------------------------------------------------- +%% Toxiproxy utils +%%------------------------------------------------------------------------------- + +reset_proxy(ProxyHost, ProxyPort) -> + Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/reset", + Body = <<>>, + {ok, {{_, 204, _}, _, _}} = httpc:request( + post, + {Url, [], "application/json", Body}, + [], + [{body_format, binary}] + ). + +with_failure(FailureType, Name, ProxyHost, ProxyPort, Fun) -> + enable_failure(FailureType, Name, ProxyHost, ProxyPort), + try + Fun() + after + heal_failure(FailureType, Name, ProxyHost, ProxyPort) + end. + +enable_failure(FailureType, Name, ProxyHost, ProxyPort) -> + case FailureType of + down -> switch_proxy(off, Name, ProxyHost, ProxyPort); + timeout -> timeout_proxy(on, Name, ProxyHost, ProxyPort); + latency_up -> latency_up_proxy(on, Name, ProxyHost, ProxyPort) + end. + +heal_failure(FailureType, Name, ProxyHost, ProxyPort) -> + case FailureType of + down -> switch_proxy(on, Name, ProxyHost, ProxyPort); + timeout -> timeout_proxy(off, Name, ProxyHost, ProxyPort); + latency_up -> latency_up_proxy(off, Name, ProxyHost, ProxyPort) + end. + +switch_proxy(Switch, Name, ProxyHost, ProxyPort) -> + Url = "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name, + Body = + case Switch of + off -> <<"{\"enabled\":false}">>; + on -> <<"{\"enabled\":true}">> + end, + {ok, {{_, 200, _}, _, _}} = httpc:request( + post, + {Url, [], "application/json", Body}, + [], + [{body_format, binary}] + ). + +timeout_proxy(on, Name, ProxyHost, ProxyPort) -> + Url = + "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++ + "/toxics", + NameBin = list_to_binary(Name), + Body = + <<"{\"name\":\"", NameBin/binary, + "_timeout\",\"type\":\"timeout\"," + "\"stream\":\"upstream\",\"toxicity\":1.0," + "\"attributes\":{\"timeout\":0}}">>, + {ok, {{_, 200, _}, _, _}} = httpc:request( + post, + {Url, [], "application/json", Body}, + [], + [{body_format, binary}] + ); +timeout_proxy(off, Name, ProxyHost, ProxyPort) -> + ToxicName = Name ++ "_timeout", + Url = + "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++ + "/toxics/" ++ ToxicName, + Body = <<>>, + {ok, {{_, 204, _}, _, _}} = httpc:request( + delete, + {Url, [], "application/json", Body}, + [], + [{body_format, binary}] + ). + +latency_up_proxy(on, Name, ProxyHost, ProxyPort) -> + Url = + "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++ + "/toxics", + NameBin = list_to_binary(Name), + Body = + <<"{\"name\":\"", NameBin/binary, + "_latency_up\",\"type\":\"latency\"," + "\"stream\":\"upstream\",\"toxicity\":1.0," + "\"attributes\":{\"latency\":20000,\"jitter\":3000}}">>, + {ok, {{_, 200, _}, _, _}} = httpc:request( + post, + {Url, [], "application/json", Body}, + [], + [{body_format, binary}] + ); +latency_up_proxy(off, Name, ProxyHost, ProxyPort) -> + ToxicName = Name ++ "_latency_up", + Url = + "http://" ++ ProxyHost ++ ":" ++ integer_to_list(ProxyPort) ++ "/proxies/" ++ Name ++ + "/toxics/" ++ ToxicName, + Body = <<>>, + {ok, {{_, 204, _}, _, _}} = httpc:request( + delete, + {Url, [], "application/json", Body}, + [], + [{body_format, binary}] + ). diff --git a/lib-ee/emqx_ee_bridge/docker-ct b/lib-ee/emqx_ee_bridge/docker-ct index a79037903..3be129d94 100644 --- a/lib-ee/emqx_ee_bridge/docker-ct +++ b/lib-ee/emqx_ee_bridge/docker-ct @@ -1,3 +1,4 @@ +influxdb +kafka mongo mongo_rs_sharded -kafka diff --git a/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl new file mode 100644 index 000000000..a05a26e29 --- /dev/null +++ b/lib-ee/emqx_ee_bridge/test/emqx_ee_bridge_influxdb_SUITE.erl @@ -0,0 +1,863 @@ +%%-------------------------------------------------------------------- +%% Copyright (c) 2022 EMQ Technologies Co., Ltd. All Rights Reserved. +%%-------------------------------------------------------------------- +-module(emqx_ee_bridge_influxdb_SUITE). + +-compile(nowarn_export_all). +-compile(export_all). + +-include_lib("eunit/include/eunit.hrl"). +-include_lib("common_test/include/ct.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). + +%%------------------------------------------------------------------------------ +%% CT boilerplate +%%------------------------------------------------------------------------------ + +all() -> + [ + {group, with_batch}, + {group, without_batch} + ]. + +groups() -> + TCs = emqx_common_test_helpers:all(?MODULE), + [ + {with_batch, [ + {group, sync_query}, + {group, async_query} + ]}, + {without_batch, [ + {group, sync_query}, + {group, async_query} + ]}, + {sync_query, [ + {group, apiv1_tcp}, + {group, apiv1_tls}, + {group, apiv2_tcp}, + {group, apiv2_tls} + ]}, + {async_query, [ + {group, apiv1_tcp}, + {group, apiv1_tls}, + {group, apiv2_tcp}, + {group, apiv2_tls} + ]}, + {apiv1_tcp, TCs}, + {apiv1_tls, TCs}, + {apiv2_tcp, TCs}, + {apiv2_tls, TCs} + ]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok = emqx_common_test_helpers:stop_apps([emqx_conf]), + ok = emqx_connector_test_helpers:stop_apps([emqx_bridge, emqx_resource]), + _ = application:stop(emqx_connector), + ok. + +init_per_group(InfluxDBType, Config0) when + InfluxDBType =:= apiv1_tcp; + InfluxDBType =:= apiv1_tls +-> + #{ + host := InfluxDBHost, + port := InfluxDBPort, + use_tls := UseTLS, + proxy_name := ProxyName + } = + case InfluxDBType of + apiv1_tcp -> + #{ + host => os:getenv("INFLUXDB_APIV1_TCP_HOST", "toxiproxy"), + port => list_to_integer(os:getenv("INFLUXDB_APIV1_TCP_PORT", "8086")), + use_tls => false, + proxy_name => "influxdb_tcp" + }; + apiv1_tls -> + #{ + host => os:getenv("INFLUXDB_APIV1_TLS_HOST", "toxiproxy"), + port => list_to_integer(os:getenv("INFLUXDB_APIV1_TLS_PORT", "8087")), + use_tls => true, + proxy_name => "influxdb_tls" + } + end, + case emqx_common_test_helpers:is_tcp_server_available(InfluxDBHost, InfluxDBPort) of + true -> + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge]), + {ok, _} = application:ensure_all_started(emqx_connector), + Config = [{use_tls, UseTLS} | Config0], + {Name, ConfigString, InfluxDBConfig} = influxdb_config( + apiv1, InfluxDBHost, InfluxDBPort, Config + ), + EHttpcPoolNameBin = <<(atom_to_binary(?MODULE))/binary, "_apiv1">>, + EHttpcPoolName = binary_to_atom(EHttpcPoolNameBin), + {EHttpcTransport, EHttpcTransportOpts} = + case UseTLS of + true -> {tls, [{verify, verify_none}]}; + false -> {tcp, []} + end, + EHttpcPoolOpts = [ + {host, InfluxDBHost}, + {port, InfluxDBPort}, + {pool_size, 1}, + {transport, EHttpcTransport}, + {transport_opts, EHttpcTransportOpts} + ], + {ok, _} = ehttpc_sup:start_pool(EHttpcPoolName, EHttpcPoolOpts), + [ + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort}, + {proxy_name, ProxyName}, + {influxdb_host, InfluxDBHost}, + {influxdb_port, InfluxDBPort}, + {influxdb_type, apiv1}, + {influxdb_config, InfluxDBConfig}, + {influxdb_config_string, ConfigString}, + {ehttpc_pool_name, EHttpcPoolName}, + {influxdb_name, Name} + | Config + ]; + false -> + {skip, no_influxdb} + end; +init_per_group(InfluxDBType, Config0) when + InfluxDBType =:= apiv2_tcp; + InfluxDBType =:= apiv2_tls +-> + #{ + host := InfluxDBHost, + port := InfluxDBPort, + use_tls := UseTLS, + proxy_name := ProxyName + } = + case InfluxDBType of + apiv2_tcp -> + #{ + host => os:getenv("INFLUXDB_APIV2_TCP_HOST", "toxiproxy"), + port => list_to_integer(os:getenv("INFLUXDB_APIV2_TCP_PORT", "8086")), + use_tls => false, + proxy_name => "influxdb_tcp" + }; + apiv2_tls -> + #{ + host => os:getenv("INFLUXDB_APIV2_TLS_HOST", "toxiproxy"), + port => list_to_integer(os:getenv("INFLUXDB_APIV2_TLS_PORT", "8087")), + use_tls => true, + proxy_name => "influxdb_tls" + } + end, + case emqx_common_test_helpers:is_tcp_server_available(InfluxDBHost, InfluxDBPort) of + true -> + ProxyHost = os:getenv("PROXY_HOST", "toxiproxy"), + ProxyPort = list_to_integer(os:getenv("PROXY_PORT", "8474")), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ok = emqx_common_test_helpers:start_apps([emqx_conf]), + ok = emqx_connector_test_helpers:start_apps([emqx_resource, emqx_bridge]), + {ok, _} = application:ensure_all_started(emqx_connector), + Config = [{use_tls, UseTLS} | Config0], + {Name, ConfigString, InfluxDBConfig} = influxdb_config( + apiv2, InfluxDBHost, InfluxDBPort, Config + ), + EHttpcPoolNameBin = <<(atom_to_binary(?MODULE))/binary, "_apiv2">>, + EHttpcPoolName = binary_to_atom(EHttpcPoolNameBin), + {EHttpcTransport, EHttpcTransportOpts} = + case UseTLS of + true -> {tls, [{verify, verify_none}]}; + false -> {tcp, []} + end, + EHttpcPoolOpts = [ + {host, InfluxDBHost}, + {port, InfluxDBPort}, + {pool_size, 1}, + {transport, EHttpcTransport}, + {transport_opts, EHttpcTransportOpts} + ], + {ok, _} = ehttpc_sup:start_pool(EHttpcPoolName, EHttpcPoolOpts), + [ + {proxy_host, ProxyHost}, + {proxy_port, ProxyPort}, + {proxy_name, ProxyName}, + {influxdb_host, InfluxDBHost}, + {influxdb_port, InfluxDBPort}, + {influxdb_type, apiv2}, + {influxdb_config, InfluxDBConfig}, + {influxdb_config_string, ConfigString}, + {ehttpc_pool_name, EHttpcPoolName}, + {influxdb_name, Name} + | Config + ]; + false -> + {skip, no_influxdb} + end; +init_per_group(sync_query, Config) -> + [{query_mode, sync} | Config]; +init_per_group(async_query, Config) -> + [{query_mode, async} | Config]; +init_per_group(with_batch, Config) -> + [{enable_batch, true} | Config]; +init_per_group(without_batch, Config) -> + [{enable_batch, false} | Config]; +init_per_group(_Group, Config) -> + Config. + +end_per_group(Group, Config) when + Group =:= apiv1_tcp; + Group =:= apiv1_tls; + Group =:= apiv2_tcp; + Group =:= apiv2_tls +-> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + EHttpcPoolName = ?config(ehttpc_pool_name, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + ehttpc_sup:stop_pool(EHttpcPoolName), + delete_bridge(Config), + ok; +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_Testcase, Config) -> + %% catch clear_db(Config), + %% delete_bridge(Config), + delete_all_bridges(), + Config. + +end_per_testcase(_Testcase, Config) -> + ProxyHost = ?config(proxy_host, Config), + ProxyPort = ?config(proxy_port, Config), + emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), + %% catch clear_db(Config), + %% delete_bridge(Config), + delete_all_bridges(), + ok. + +%%------------------------------------------------------------------------------ +%% Helper fns +%%------------------------------------------------------------------------------ + +example_write_syntax() -> + %% N.B.: this single space character is relevant + <<"${topic},clientid=${clientid}", " ", "payload=${payload},", + "${clientid}_int_value=${payload.int_key}i,", + "uint_value=${payload.uint_key}u," + "float_value=${payload.float_key},", "undef_value=${payload.undef},", + "${undef_key}=\"hard-coded-value\",", "bool=${payload.bool}">>. + +influxdb_config(apiv1 = Type, InfluxDBHost, InfluxDBPort, Config) -> + EnableBatch = proplists:get_value(enable_batch, Config, true), + QueryMode = proplists:get_value(query_mode, Config, sync), + UseTLS = proplists:get_value(use_tls, Config, false), + Name = atom_to_binary(?MODULE), + WriteSyntax = example_write_syntax(), + ConfigString = + io_lib:format( + "bridges.influxdb_api_v1.~s {\n" + " enable = true\n" + " server = \"~p:~b\"\n" + " database = mqtt\n" + " username = root\n" + " password = emqx@123\n" + " precision = ns\n" + " write_syntax = \"~s\"\n" + " resource_opts = {\n" + " enable_batch = ~p\n" + " query_mode = ~s\n" + " }\n" + " ssl {\n" + " enable = ~p\n" + " verify = verify_none\n" + " }\n" + "}\n", + [Name, InfluxDBHost, InfluxDBPort, WriteSyntax, EnableBatch, QueryMode, UseTLS] + ), + {Name, ConfigString, parse_and_check(ConfigString, Type, Name)}; +influxdb_config(apiv2 = Type, InfluxDBHost, InfluxDBPort, Config) -> + EnableBatch = proplists:get_value(enable_batch, Config, true), + QueryMode = proplists:get_value(query_mode, Config, sync), + UseTLS = proplists:get_value(use_tls, Config, false), + Name = atom_to_binary(?MODULE), + WriteSyntax = example_write_syntax(), + ConfigString = + io_lib:format( + "bridges.influxdb_api_v2.~s {\n" + " enable = true\n" + " server = \"~p:~b\"\n" + " bucket = mqtt\n" + " org = emqx\n" + " token = abcdefg\n" + " precision = ns\n" + " write_syntax = \"~s\"\n" + " resource_opts = {\n" + " enable_batch = ~p\n" + " query_mode = ~s\n" + " }\n" + " ssl {\n" + " enable = ~p\n" + " verify = verify_none\n" + " }\n" + "}\n", + [Name, InfluxDBHost, InfluxDBPort, WriteSyntax, EnableBatch, QueryMode, UseTLS] + ), + {Name, ConfigString, parse_and_check(ConfigString, Type, Name)}. + +parse_and_check(ConfigString, Type, Name) -> + {ok, RawConf} = hocon:binary(ConfigString, #{format => map}), + TypeBin = influxdb_type_bin(Type), + hocon_tconf:check_plain(emqx_bridge_schema, RawConf, #{required => false, atom_key => false}), + #{<<"bridges">> := #{TypeBin := #{Name := Config}}} = RawConf, + Config. + +influxdb_type_bin(apiv1) -> + <<"influxdb_api_v1">>; +influxdb_type_bin(apiv2) -> + <<"influxdb_api_v2">>. + +create_bridge(Config) -> + Type = influxdb_type_bin(?config(influxdb_type, Config)), + Name = ?config(influxdb_name, Config), + InfluxDBConfig = ?config(influxdb_config, Config), + emqx_bridge:create(Type, Name, InfluxDBConfig). + +delete_bridge(Config) -> + Type = influxdb_type_bin(?config(influxdb_type, Config)), + Name = ?config(influxdb_name, Config), + emqx_bridge:remove(Type, Name). + +delete_all_bridges() -> + lists:foreach( + fun(#{name := Name, type := Type}) -> + emqx_bridge:remove(Type, Name) + end, + emqx_bridge:list() + ). + +send_message(Config, Payload) -> + Name = ?config(influxdb_name, Config), + Type = influxdb_type_bin(?config(influxdb_type, Config)), + BridgeId = emqx_bridge_resource:bridge_id(Type, Name), + emqx_bridge:send_message(BridgeId, Payload). + +query_by_clientid(ClientId, Config) -> + InfluxDBHost = ?config(influxdb_host, Config), + InfluxDBPort = ?config(influxdb_port, Config), + EHttpcPoolName = ?config(ehttpc_pool_name, Config), + UseTLS = ?config(use_tls, Config), + Path = <<"/api/v2/query?org=emqx">>, + Scheme = + case UseTLS of + true -> <<"https://">>; + false -> <<"http://">> + end, + URI = iolist_to_binary([ + Scheme, + list_to_binary(InfluxDBHost), + ":", + integer_to_binary(InfluxDBPort), + Path + ]), + Query = + << + "from(bucket: \"mqtt\")\n" + " |> range(start: -12h)\n" + " |> filter(fn: (r) => r.clientid == \"", + ClientId/binary, + "\")" + >>, + Headers = [ + {"Authorization", "Token abcdefg"}, + {"Content-Type", "application/json"} + ], + Body = + emqx_json:encode(#{ + query => Query, + dialect => #{ + header => true, + delimiter => <<";">> + } + }), + {ok, 200, _Headers, RawBody0} = + ehttpc:request( + EHttpcPoolName, + post, + {URI, Headers, Body}, + _Timeout = 10_000, + _Retry = 0 + ), + RawBody1 = iolist_to_binary(string:replace(RawBody0, <<"\r\n">>, <<"\n">>, all)), + {ok, DecodedCSV0} = erl_csv:decode(RawBody1, #{separator => <<$;>>}), + DecodedCSV1 = [ + [Field || Field <- Line, Field =/= <<>>] + || Line <- DecodedCSV0, + Line =/= [<<>>] + ], + DecodedCSV2 = csv_lines_to_maps(DecodedCSV1, []), + index_by_field(DecodedCSV2). + +decode_csv(RawBody) -> + Lines = + [ + binary:split(Line, [<<";">>], [global, trim_all]) + || Line <- binary:split(RawBody, [<<"\r\n">>], [global, trim_all]) + ], + csv_lines_to_maps(Lines, []). + +csv_lines_to_maps([Fields, Data | Rest], Acc) -> + Map = maps:from_list(lists:zip(Fields, Data)), + csv_lines_to_maps(Rest, [Map | Acc]); +csv_lines_to_maps(_Data, Acc) -> + lists:reverse(Acc). + +index_by_field(DecodedCSV) -> + maps:from_list([{Field, Data} || Data = #{<<"_field">> := Field} <- DecodedCSV]). + +assert_persisted_data(ClientId, Expected, PersistedData) -> + ClientIdIntKey = <>, + maps:foreach( + fun + (int_value, ExpectedValue) -> + ?assertMatch( + #{<<"_value">> := ExpectedValue}, + maps:get(ClientIdIntKey, PersistedData) + ); + (Key, ExpectedValue) -> + ?assertMatch( + #{<<"_value">> := ExpectedValue}, + maps:get(atom_to_binary(Key), PersistedData), + #{expected => ExpectedValue} + ) + end, + Expected + ), + ok. + +resource_id(Config) -> + Type = influxdb_type_bin(?config(influxdb_type, Config)), + Name = ?config(influxdb_name, Config), + emqx_bridge_resource:resource_id(Type, Name). + +instance_id(Config) -> + ResourceId = resource_id(Config), + [{_, InstanceId}] = ets:lookup(emqx_resource_manager, {owner, ResourceId}), + InstanceId. + +%%------------------------------------------------------------------------------ +%% Testcases +%%------------------------------------------------------------------------------ + +t_start_ok(Config) -> + QueryMode = ?config(query_mode, Config), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{ + int_key => -123, + bool => true, + float_key => 24.5, + uint_key => 123 + }, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"timestamp">> => erlang:system_time(nanosecond), + <<"payload">> => Payload + }, + ?check_trace( + begin + ?assertEqual(ok, send_message(Config, SentData)), + case QueryMode of + async -> ct:sleep(500); + sync -> ok + end, + PersistedData = query_by_clientid(ClientId, Config), + Expected = #{ + bool => <<"true">>, + int_value => <<"-123">>, + uint_value => <<"123">>, + float_value => <<"24.5">>, + payload => emqx_json:encode(Payload) + }, + assert_persisted_data(ClientId, Expected, PersistedData), + ok + end, + fun(Trace0) -> + Trace = ?of_kind(influxdb_connector_send_query, Trace0), + ?assertMatch([#{points := [_]}], Trace), + [#{points := [Point]}] = Trace, + ct:pal("sent point: ~p", [Point]), + ?assertMatch( + #{ + fields := #{}, + measurement := <<_/binary>>, + tags := #{}, + timestamp := TS + } when is_integer(TS), + Point + ), + #{fields := Fields} = Point, + ?assert(lists:all(fun is_binary/1, maps:keys(Fields))), + ?assertNot(maps:is_key(<<"undefined">>, Fields)), + ?assertNot(maps:is_key(<<"undef_value">>, Fields)), + ok + end + ), + ok = snabbkaffe:stop(), + ok. + +t_start_already_started(Config) -> + Type = influxdb_type_bin(?config(influxdb_type, Config)), + Name = ?config(influxdb_name, Config), + InfluxDBConfigString = ?config(influxdb_config_string, Config), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + InstanceId = instance_id(Config), + TypeAtom = binary_to_atom(Type), + NameAtom = binary_to_atom(Name), + {ok, #{bridges := #{TypeAtom := #{NameAtom := InfluxDBConfigMap}}}} = emqx_hocon:check( + emqx_bridge_schema, InfluxDBConfigString + ), + ?check_trace( + emqx_ee_connector_influxdb:on_start(InstanceId, InfluxDBConfigMap), + fun(Result, Trace) -> + ?assertMatch({ok, _}, Result), + ?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)), + ok + end + ), + ok = snabbkaffe:stop(), + ok. + +t_start_ok_timestamp_write_syntax(Config) -> + InfluxDBType = ?config(influxdb_type, Config), + InfluxDBName = ?config(influxdb_name, Config), + InfluxDBConfigString0 = ?config(influxdb_config_string, Config), + InfluxDBTypeCfg = + case InfluxDBType of + apiv1 -> "influxdb_api_v1"; + apiv2 -> "influxdb_api_v2" + end, + WriteSyntax = + %% N.B.: this single space characters are relevant + <<"${topic},clientid=${clientid}", " ", "payload=${payload},", + "${clientid}_int_value=${payload.int_key}i,", + "uint_value=${payload.uint_key}u," + "bool=${payload.bool}", " ", "${timestamp}">>, + %% append this to override the config + InfluxDBConfigString1 = + io_lib:format( + "bridges.~s.~s {\n" + " write_syntax = \"~s\"\n" + "}\n", + [InfluxDBTypeCfg, InfluxDBName, WriteSyntax] + ), + InfluxDBConfig1 = parse_and_check( + InfluxDBConfigString0 ++ InfluxDBConfigString1, + InfluxDBType, + InfluxDBName + ), + Config1 = [{influxdb_config, InfluxDBConfig1} | Config], + ?assertMatch( + {ok, _}, + create_bridge(Config1) + ), + ok. + +t_start_ok_no_subject_tags_write_syntax(Config) -> + InfluxDBType = ?config(influxdb_type, Config), + InfluxDBName = ?config(influxdb_name, Config), + InfluxDBConfigString0 = ?config(influxdb_config_string, Config), + InfluxDBTypeCfg = + case InfluxDBType of + apiv1 -> "influxdb_api_v1"; + apiv2 -> "influxdb_api_v2" + end, + WriteSyntax = + %% N.B.: this single space characters are relevant + <<"${topic}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,", + "uint_value=${payload.uint_key}u," + "bool=${payload.bool}", " ", "${timestamp}">>, + %% append this to override the config + InfluxDBConfigString1 = + io_lib:format( + "bridges.~s.~s {\n" + " write_syntax = \"~s\"\n" + "}\n", + [InfluxDBTypeCfg, InfluxDBName, WriteSyntax] + ), + InfluxDBConfig1 = parse_and_check( + InfluxDBConfigString0 ++ InfluxDBConfigString1, + InfluxDBType, + InfluxDBName + ), + Config1 = [{influxdb_config, InfluxDBConfig1} | Config], + ?assertMatch( + {ok, _}, + create_bridge(Config1) + ), + ok. + +t_boolean_variants(Config) -> + QueryMode = ?config(query_mode, Config), + ?assertMatch( + {ok, _}, + create_bridge(Config) + ), + BoolVariants = #{ + true => true, + false => false, + <<"t">> => true, + <<"f">> => false, + <<"T">> => true, + <<"F">> => false, + <<"TRUE">> => true, + <<"FALSE">> => false, + <<"True">> => true, + <<"False">> => false + }, + maps:foreach( + fun(BoolVariant, Translation) -> + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{ + int_key => -123, + bool => BoolVariant, + uint_key => 123 + }, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"timestamp">> => erlang:system_time(nanosecond), + <<"payload">> => Payload + }, + ?assertEqual(ok, send_message(Config, SentData)), + case QueryMode of + async -> ct:sleep(500); + sync -> ok + end, + PersistedData = query_by_clientid(ClientId, Config), + Expected = #{ + bool => atom_to_binary(Translation), + int_value => <<"-123">>, + uint_value => <<"123">>, + payload => emqx_json:encode(Payload) + }, + assert_persisted_data(ClientId, Expected, PersistedData), + ok + end, + BoolVariants + ), + ok. + +t_bad_timestamp(Config) -> + InfluxDBType = ?config(influxdb_type, Config), + InfluxDBName = ?config(influxdb_name, Config), + QueryMode = ?config(query_mode, Config), + EnableBatch = ?config(enable_batch, Config), + InfluxDBConfigString0 = ?config(influxdb_config_string, Config), + InfluxDBTypeCfg = + case InfluxDBType of + apiv1 -> "influxdb_api_v1"; + apiv2 -> "influxdb_api_v2" + end, + WriteSyntax = + %% N.B.: this single space characters are relevant + <<"${topic}", " ", "payload=${payload},", "${clientid}_int_value=${payload.int_key}i,", + "uint_value=${payload.uint_key}u," + "bool=${payload.bool}", " ", "bad_timestamp">>, + %% append this to override the config + InfluxDBConfigString1 = + io_lib:format( + "bridges.~s.~s {\n" + " write_syntax = \"~s\"\n" + "}\n", + [InfluxDBTypeCfg, InfluxDBName, WriteSyntax] + ), + InfluxDBConfig1 = parse_and_check( + InfluxDBConfigString0 ++ InfluxDBConfigString1, + InfluxDBType, + InfluxDBName + ), + Config1 = [{influxdb_config, InfluxDBConfig1} | Config], + ?assertMatch( + {ok, _}, + create_bridge(Config1) + ), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{ + int_key => -123, + bool => false, + uint_key => 123 + }, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"timestamp">> => erlang:system_time(nanosecond), + <<"payload">> => Payload + }, + ?check_trace( + ?wait_async_action( + send_message(Config1, SentData), + #{?snk_kind := influxdb_connector_send_query_error}, + 10_000 + ), + fun(Result, Trace) -> + ?assertMatch({_, {ok, _}}, Result), + {Return, {ok, _}} = Result, + case {QueryMode, EnableBatch} of + {async, true} -> + ?assertEqual(ok, Return), + ?assertMatch( + [#{error := points_trans_failed}], + ?of_kind(influxdb_connector_send_query_error, Trace) + ); + {async, false} -> + ?assertEqual(ok, Return), + ?assertMatch( + [#{error := [{error, {bad_timestamp, [<<"bad_timestamp">>]}}]}], + ?of_kind(influxdb_connector_send_query_error, Trace) + ); + {sync, false} -> + ?assertEqual( + {error, [{error, {bad_timestamp, [<<"bad_timestamp">>]}}]}, Return + ); + {sync, true} -> + ?assertEqual({error, points_trans_failed}, Return) + end, + ok + end + ), + ok = snabbkaffe:stop(), + ok. + +t_get_status(Config) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + {ok, _} = create_bridge(Config), + ResourceId = resource_id(Config), + ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)), + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)) + end), + ok. + +t_create_disconnected(Config) -> + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + ProxyName = ?config(proxy_name, Config), + ?check_trace( + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + ?assertMatch({ok, _}, create_bridge(Config)) + end), + fun(Trace) -> + ?assertMatch( + [#{error := influxdb_client_not_alive}], + ?of_kind(influxdb_connector_start_failed, Trace) + ), + ok + end + ), + ok = snabbkaffe:stop(), + ok. + +t_start_error(Config) -> + %% simulate client start error + ?check_trace( + emqx_common_test_helpers:with_mock( + influxdb, + start_client, + fun(_Config) -> {error, some_error} end, + fun() -> + ?wait_async_action( + ?assertMatch({ok, _}, create_bridge(Config)), + #{?snk_kind := influxdb_connector_start_failed}, + 10_000 + ) + end + ), + fun(Trace) -> + ?assertMatch( + [#{error := some_error}], + ?of_kind(influxdb_connector_start_failed, Trace) + ), + ok + end + ), + ok = snabbkaffe:stop(), + ok. + +t_start_exception(Config) -> + %% simulate client start exception + ?check_trace( + emqx_common_test_helpers:with_mock( + influxdb, + start_client, + fun(_Config) -> error(boom) end, + fun() -> + ?wait_async_action( + ?assertMatch({ok, _}, create_bridge(Config)), + #{?snk_kind := influxdb_connector_start_exception}, + 10_000 + ) + end + ), + fun(Trace) -> + ?assertMatch( + [#{error := {error, boom}}], + ?of_kind(influxdb_connector_start_exception, Trace) + ), + ok + end + ), + ok = snabbkaffe:stop(), + ok. + +t_write_failure(Config) -> + ProxyName = ?config(proxy_name, Config), + ProxyPort = ?config(proxy_port, Config), + ProxyHost = ?config(proxy_host, Config), + QueryMode = ?config(query_mode, Config), + {ok, _} = create_bridge(Config), + ClientId = emqx_guid:to_hexstr(emqx_guid:gen()), + Payload = #{ + int_key => -123, + bool => true, + float_key => 24.5, + uint_key => 123 + }, + SentData = #{ + <<"clientid">> => ClientId, + <<"topic">> => atom_to_binary(?FUNCTION_NAME), + <<"timestamp">> => erlang:system_time(nanosecond), + <<"payload">> => Payload + }, + ?check_trace( + emqx_common_test_helpers:with_failure(down, ProxyName, ProxyHost, ProxyPort, fun() -> + send_message(Config, SentData) + end), + fun(Result, _Trace) -> + case QueryMode of + sync -> + ?assert( + {error, {error, {closed, "The connection was lost."}}} =:= Result orelse + {error, {error, closed}} =:= Result orelse + {error, {error, econnrefused}} =:= Result, + #{got => Result} + ); + async -> + ?assertEqual(ok, Result) + end, + ok + end + ), + ok = snabbkaffe:stop(), + ok. diff --git a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl index 3f86792ff..36b2ec44d 100644 --- a/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl +++ b/lib-ee/emqx_ee_connector/src/emqx_ee_connector_influxdb.erl @@ -9,6 +9,7 @@ -include_lib("hocon/include/hoconsc.hrl"). -include_lib("typerefl/include/types.hrl"). -include_lib("emqx/include/logger.hrl"). +-include_lib("snabbkaffe/include/snabbkaffe.hrl"). -import(hoconsc, [mk/2, enum/1, ref/2]). @@ -51,8 +52,16 @@ on_stop(_InstId, #{client := Client}) -> on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) -> case data_to_points(Data, SyntaxLines) of {ok, Points} -> + ?tp( + influxdb_connector_send_query, + #{points => Points, batch => false, mode => sync} + ), do_query(InstId, Client, Points); {error, ErrorPoints} = Err -> + ?tp( + influxdb_connector_send_query_error, + #{batch => false, mode => sync, error => ErrorPoints} + ), log_error_points(InstId, ErrorPoints), Err end. @@ -62,8 +71,16 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) -> case parse_batch_data(InstId, BatchData, SyntaxLines) of {ok, Points} -> + ?tp( + influxdb_connector_send_query, + #{points => Points, batch => true, mode => sync} + ), do_query(InstId, Client, Points); {error, Reason} -> + ?tp( + influxdb_connector_send_query_error, + #{batch => true, mode => sync, error => Reason} + ), {error, Reason} end. @@ -75,8 +92,16 @@ on_query_async( ) -> case data_to_points(Data, SyntaxLines) of {ok, Points} -> + ?tp( + influxdb_connector_send_query, + #{points => Points, batch => false, mode => async} + ), do_async_query(InstId, Client, Points, {ReplayFun, Args}); {error, ErrorPoints} = Err -> + ?tp( + influxdb_connector_send_query_error, + #{batch => false, mode => async, error => ErrorPoints} + ), log_error_points(InstId, ErrorPoints), Err end. @@ -89,8 +114,16 @@ on_batch_query_async( ) -> case parse_batch_data(InstId, BatchData, SyntaxLines) of {ok, Points} -> + ?tp( + influxdb_connector_send_query, + #{points => Points, batch => true, mode => async} + ), do_async_query(InstId, Client, Points, {ReplayFun, Args}); {error, Reason} -> + ?tp( + influxdb_connector_send_query_error, + #{batch => true, mode => async, error => Reason} + ), {error, Reason} end. @@ -163,6 +196,7 @@ start_client(InstId, Config) -> do_start_client(InstId, ClientConfig, Config) catch E:R:S -> + ?tp(influxdb_connector_start_exception, #{error => {E, R}}), ?SLOG(error, #{ msg => "start influxdb connector error", connector => InstId, @@ -196,6 +230,7 @@ do_start_client( }), {ok, State}; false -> + ?tp(influxdb_connector_start_failed, #{error => influxdb_client_not_alive}), ?SLOG(error, #{ msg => "starting influxdb connector failed", connector => InstId, @@ -205,14 +240,16 @@ do_start_client( {error, influxdb_client_not_alive} end; {error, {already_started, Client0}} -> + ?tp(influxdb_connector_start_already_started, #{}), ?SLOG(info, #{ - msg => "starting influxdb connector,find already started client", + msg => "restarting influxdb connector, found already started client", connector => InstId, old_client => Client0 }), _ = influxdb:stop_client(Client0), do_start_client(InstId, ClientConfig, Config); {error, Reason} -> + ?tp(influxdb_connector_start_failed, #{error => Reason}), ?SLOG(error, #{ msg => "starting influxdb connector failed", connector => InstId, @@ -235,7 +272,7 @@ client_config( {precision, atom_to_binary(maps:get(precision, Config, ms), utf8)} ] ++ protocol_config(Config). -%% api v2 config +%% api v1 config protocol_config(#{ username := Username, password := Password, @@ -249,7 +286,7 @@ protocol_config(#{ {password, str(Password)}, {database, str(DB)} ] ++ ssl_config(SSL); -%% api v1 config +%% api v2 config protocol_config(#{ bucket := Bucket, org := Org, @@ -290,6 +327,7 @@ do_query(InstId, Client, Points) -> points => Points }); {error, Reason} = Err -> + ?tp(influxdb_connector_do_query_failure, #{error => Reason}), ?SLOG(error, #{ msg => "influxdb write point failed", connector => InstId, @@ -370,6 +408,14 @@ parse_batch_data(InstId, BatchData, SyntaxLines) -> {error, points_trans_failed} end. +-spec data_to_points(map(), [ + #{ + fields := [{binary(), binary()}], + measurement := binary(), + tags := [{binary(), binary()}], + timestamp := binary() + } +]) -> {ok, [map()]} | {error, term()}. data_to_points(Data, SyntaxLines) -> lines_to_points(Data, SyntaxLines, [], []). @@ -416,17 +462,18 @@ lines_to_points( end. maps_config_to_data(K, V, {Data, Res}) -> - KTransOptions = #{return => full_binary}, + KTransOptions = #{return => rawlist, var_trans => fun key_filter/1}, VTransOptions = #{return => rawlist, var_trans => fun data_filter/1}, - NK = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions), + NK0 = emqx_plugin_libs_rule:proc_tmpl(K, Data, KTransOptions), NV = emqx_plugin_libs_rule:proc_tmpl(V, Data, VTransOptions), - case {NK, NV} of + case {NK0, NV} of {[undefined], _} -> {Data, Res}; %% undefined value in normal format [undefined] or int/uint format [undefined, <<"i">>] {_, [undefined | _]} -> {Data, Res}; _ -> + NK = list_to_binary(NK0), {Data, Res#{NK => value_type(NV)}} end. @@ -438,6 +485,8 @@ value_type([UInt, <<"u">>]) when is_integer(UInt) -> {uint, UInt}; +value_type([Float]) when is_float(Float) -> + Float; value_type([<<"t">>]) -> 't'; value_type([<<"T">>]) -> @@ -461,6 +510,9 @@ value_type([<<"False">>]) -> value_type(Val) -> Val. +key_filter(undefined) -> undefined; +key_filter(Value) -> emqx_plugin_libs_rule:bin(Value). + data_filter(undefined) -> undefined; data_filter(Int) when is_integer(Int) -> Int; data_filter(Number) when is_number(Number) -> Number; @@ -500,3 +552,56 @@ str(B) when is_binary(B) -> binary_to_list(B); str(S) when is_list(S) -> S. + +%%=================================================================== +%% eunit tests +%%=================================================================== + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). + +to_server_raw_test_() -> + [ + ?_assertEqual( + {"foobar", 1234}, + to_server_raw(<<"http://foobar:1234">>) + ), + ?_assertEqual( + {"foobar", 1234}, + to_server_raw(<<"https://foobar:1234">>) + ), + ?_assertEqual( + {"foobar", 1234}, + to_server_raw(<<"foobar:1234">>) + ) + ]. + +%% for coverage +desc_test_() -> + [ + ?_assertMatch( + {desc, _, _}, + desc(common) + ), + ?_assertMatch( + {desc, _, _}, + desc(influxdb_udp) + ), + ?_assertMatch( + {desc, _, _}, + desc(influxdb_api_v1) + ), + ?_assertMatch( + {desc, _, _}, + desc(influxdb_api_v2) + ), + ?_assertMatch( + {desc, _, _}, + server(desc) + ), + ?_assertMatch( + connector_influxdb, + namespace() + ) + ]. +-endif. diff --git a/rebar.config.erl b/rebar.config.erl index f745b5cca..bbfdfe4a8 100644 --- a/rebar.config.erl +++ b/rebar.config.erl @@ -150,7 +150,8 @@ test_deps() -> {bbmustache, "1.10.0"}, {meck, "0.9.2"}, {proper, "1.4.0"}, - {er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0.5"}}} + {er_coap_client, {git, "https://github.com/emqx/er_coap_client", {tag, "v1.0.5"}}}, + {erl_csv, "0.2.0"} ]. common_compile_opts(Vsn) -> diff --git a/scripts/ct/run.sh b/scripts/ct/run.sh index 45d32767c..38511ec3e 100755 --- a/scripts/ct/run.sh +++ b/scripts/ct/run.sh @@ -92,6 +92,11 @@ for dep in ${CT_DEPS}; do erlang24) FILES+=( '.ci/docker-compose-file/docker-compose.yaml' ) ;; + influxdb) + FILES+=( '.ci/docker-compose-file/docker-compose-toxiproxy.yaml' + '.ci/docker-compose-file/docker-compose-influxdb-tcp.yaml' + '.ci/docker-compose-file/docker-compose-influxdb-tls.yaml' ) + ;; mongo) FILES+=( '.ci/docker-compose-file/docker-compose-mongo-single-tcp.yaml' '.ci/docker-compose-file/docker-compose-mongo-single-tls.yaml' )