test(emqx_ee_connector): add basic tests for influxdb incl. SSL opts

This adds a test suite for the emqx_ee_connector_influxdb. We add it so
that SSL transport options are properly tested.
This commit is contained in:
Erik Timan 2023-02-02 14:53:17 +01:00
parent 3dea6b7913
commit cf77dcf25e
3 changed files with 212 additions and 0 deletions

View File

@ -0,0 +1,2 @@
toxiproxy
influxdb

View File

@ -29,6 +29,7 @@
-export([reply_callback/2]).
-export([
roots/0,
namespace/0,
fields/1,
desc/1
@ -139,6 +140,19 @@ on_get_status(_InstId, #{client := Client}) ->
%% schema
namespace() -> connector_influxdb.
roots() ->
[
{config, #{
type => hoconsc:union(
[
hoconsc:ref(?MODULE, influxdb_udp),
hoconsc:ref(?MODULE, influxdb_api_v1),
hoconsc:ref(?MODULE, influxdb_api_v2)
]
)
}}
].
fields(common) ->
[
{server, server()},

View File

@ -0,0 +1,196 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_connector_influxdb_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include("emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(INFLUXDB_RESOURCE_MOD, emqx_ee_connector_influxdb).
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
init_per_suite(Config) ->
InfluxDBTCPHost = os:getenv("INFLUXDB_APIV2_TCP_HOST", "toxiproxy"),
InfluxDBTCPPort = list_to_integer(os:getenv("INFLUXDB_APIV2_TCP_PORT", "8086")),
InfluxDBTLSHost = os:getenv("INFLUXDB_APIV2_TLS_HOST", "toxiproxy"),
InfluxDBTLSPort = list_to_integer(os:getenv("INFLUXDB_APIV2_TLS_PORT", "8087")),
case emqx_common_test_helpers:is_tcp_server_available(InfluxDBTCPHost, InfluxDBTCPPort) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
[
{influxdb_tcp_host, InfluxDBTCPHost},
{influxdb_tcp_port, InfluxDBTCPPort},
{influxdb_tls_host, InfluxDBTLSHost},
{influxdb_tls_port, InfluxDBTLSPort}
| Config
];
false ->
{skip, no_influxdb}
end.
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector).
init_per_testcase(_, Config) ->
Config.
end_per_testcase(_, _Config) ->
ok.
% %%------------------------------------------------------------------------------
% %% Testcases
% %%------------------------------------------------------------------------------
t_lifecycle(Config) ->
Host = ?config(influxdb_tcp_host, Config),
Port = ?config(influxdb_tcp_port, Config),
perform_lifecycle_check(
<<"emqx_ee_connector_influxdb_SUITE">>,
influxdb_config(Host, Port, false, "verify_none")
).
perform_lifecycle_check(PoolName, InitialConfig) ->
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?INFLUXDB_RESOURCE_MOD, InitialConfig),
% We need to add a write_syntax to the config since the connector
% expects this
FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()},
{ok, #{
state := #{client := #{pool := ReturnedPoolName}} = State,
status := InitialStatus
}} = emqx_resource:create_local(
PoolName,
?CONNECTOR_RESOURCE_GROUP,
?INFLUXDB_RESOURCE_MOD,
FullConfig,
#{}
),
?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
state := State,
status := InitialStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
% % Perform query as further check that the resource is working as expected
?assertMatch(ok, emqx_resource:query(PoolName, test_query())),
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
state := State,
status := StoppedStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual(stopped, StoppedStatus),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
% Can call stop/1 again on an already stopped instance
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName)),
% async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
?assertMatch(ok, emqx_resource:query(PoolName, test_query())),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
% Should not even be able to get the resource data out of ets now unlike just stopping.
?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
t_tls_opts(Config) ->
PoolName = <<"emqx_ee_connector_influxdb_SUITE">>,
Host = ?config(influxdb_tls_host, Config),
Port = ?config(influxdb_tls_port, Config),
VerifyNoneStatus = perform_tls_opts_check(
PoolName, influxdb_config(Host, Port, true, "verify_none")
),
?assertEqual(connected, VerifyNoneStatus),
VerifyPeerStatus = perform_tls_opts_check(
PoolName, influxdb_config(Host, Port, true, "verify_peer")
),
?assertEqual(disconnected, VerifyPeerStatus),
ok.
perform_tls_opts_check(PoolName, InitialConfig) ->
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?INFLUXDB_RESOURCE_MOD, InitialConfig),
% We need to add a write_syntax to the config since the connector
% expects this
FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()},
{ok, #{
config := #{ssl := #{enable := SslEnabled}},
status := Status
}} = emqx_resource:create_local(
PoolName,
?CONNECTOR_RESOURCE_GROUP,
?INFLUXDB_RESOURCE_MOD,
FullConfig,
#{}
),
?assert(SslEnabled),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
Status.
% %%------------------------------------------------------------------------------
% %% Helpers
% %%------------------------------------------------------------------------------
influxdb_config(Host, Port, SslEnabled, Verify) ->
RawConfig = list_to_binary(
io_lib:format(
""
"\n"
" bucket = mqtt\n"
" org = emqx\n"
" token = abcdefg\n"
" server = \"~s:~b\"\n"
" ssl {\n"
" enable = ~s\n"
" verify = ~s\n"
" }\n"
" "
"",
[Host, Port, SslEnabled, Verify]
)
),
{ok, ResourceConfig} = hocon:binary(RawConfig),
#{<<"config">> => ResourceConfig}.
influxdb_write_syntax() ->
[
#{
measurement => "${topic}",
tags => [{"clientid", "${clientid}"}],
fields => [{"payload", "${payload}"}],
timestamp => undefined
}
].
test_query() ->
{send_message, #{
<<"clientid">> => <<"something">>,
<<"payload">> => #{bool => true},
<<"topic">> => <<"connector_test">>
}}.