Merge pull request #9895 from olcai/add-test-for-influxdb-ssl-opts

test(emqx_ee_connector): add basic tests for influxdb incl. SSL opts
This commit is contained in:
Erik Timan 2023-02-20 10:05:47 +01:00 committed by GitHub
commit 291755f81f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 251 additions and 48 deletions

View File

@ -31,12 +31,6 @@
conn_bridge_examples(Method) ->
[
#{
<<"influxdb_udp">> => #{
summary => <<"InfluxDB UDP Bridge">>,
value => values("influxdb_udp", Method)
}
},
#{
<<"influxdb_api_v1">> => #{
summary => <<"InfluxDB HTTP API V1 Bridge">>,
@ -71,12 +65,6 @@ values("influxdb_api_v1", post) ->
server => <<"127.0.0.1:8086">>
},
values(common, "influxdb_api_v1", SupportUint, TypeOpts);
values("influxdb_udp", post) ->
SupportUint = <<>>,
TypeOpts = #{
server => <<"127.0.0.1:8089">>
},
values(common, "influxdb_udp", SupportUint, TypeOpts);
values(Protocol, put) ->
values(Protocol, post).
@ -106,26 +94,20 @@ namespace() -> "bridge_influxdb".
roots() -> [].
fields("post_udp") ->
method_fileds(post, influxdb_udp);
fields("post_api_v1") ->
method_fileds(post, influxdb_api_v1);
fields("post_api_v2") ->
method_fileds(post, influxdb_api_v2);
fields("put_udp") ->
method_fileds(put, influxdb_udp);
fields("put_api_v1") ->
method_fileds(put, influxdb_api_v1);
fields("put_api_v2") ->
method_fileds(put, influxdb_api_v2);
fields("get_udp") ->
method_fileds(get, influxdb_udp);
fields("get_api_v1") ->
method_fileds(get, influxdb_api_v1);
fields("get_api_v2") ->
method_fileds(get, influxdb_api_v2);
fields(Type) when
Type == influxdb_udp orelse Type == influxdb_api_v1 orelse Type == influxdb_api_v2
Type == influxdb_api_v1 orelse Type == influxdb_api_v2
->
influxdb_bridge_common_fields() ++
connector_fields(Type).
@ -164,8 +146,6 @@ desc("config") ->
?DESC("desc_config");
desc(Method) when Method =:= "get"; Method =:= "put"; Method =:= "post" ->
["Configuration for InfluxDB using `", string:to_upper(Method), "` method."];
desc(influxdb_udp) ->
?DESC(emqx_ee_connector_influxdb, "influxdb_udp");
desc(influxdb_api_v1) ->
?DESC(emqx_ee_connector_influxdb, "influxdb_api_v1");
desc(influxdb_api_v2) ->

View File

@ -0,0 +1,2 @@
toxiproxy
influxdb

View File

@ -26,24 +26,14 @@ The InfluxDB default port 8086 is used if `[:Port]` is not specified."""
}
protocol {
desc {
en: """InfluxDB's protocol. UDP or HTTP API or HTTP API V2."""
zh: """InfluxDB 协议。UDP 或 HTTP API 或 HTTP API V2。"""
en: """InfluxDB's protocol. HTTP API or HTTP API V2."""
zh: """InfluxDB 协议。HTTP API 或 HTTP API V2。"""
}
label {
en: """Protocol"""
zh: """协议"""
}
}
influxdb_udp {
desc {
en: """InfluxDB's UDP protocol."""
zh: """InfluxDB UDP 协议。"""
}
label {
en: """UDP Protocol"""
zh: """UDP 协议"""
}
}
influxdb_api_v1 {
desc {
en: """InfluxDB's protocol. Support InfluxDB v1.8 and before."""

View File

@ -29,6 +29,7 @@
-export([reply_callback/2]).
-export([
roots/0,
namespace/0,
fields/1,
desc/1
@ -139,6 +140,18 @@ on_get_status(_InstId, #{client := Client}) ->
%% schema
namespace() -> connector_influxdb.
roots() ->
[
{config, #{
type => hoconsc:union(
[
hoconsc:ref(?MODULE, influxdb_api_v1),
hoconsc:ref(?MODULE, influxdb_api_v2)
]
)
}}
].
fields(common) ->
[
{server, server()},
@ -151,8 +164,6 @@ fields(common) ->
required => false, default => ms, desc => ?DESC("precision")
})}
];
fields(influxdb_udp) ->
fields(common);
fields(influxdb_api_v1) ->
fields(common) ++
[
@ -185,8 +196,6 @@ server() ->
desc(common) ->
?DESC("common");
desc(influxdb_udp) ->
?DESC("influxdb_udp");
desc(influxdb_api_v1) ->
?DESC("influxdb_api_v1");
desc(influxdb_api_v2) ->
@ -312,12 +321,7 @@ protocol_config(#{
{bucket, str(Bucket)},
{org, str(Org)},
{token, Token}
] ++ ssl_config(SSL);
%% udp config
protocol_config(_) ->
[
{protocol, udp}
].
] ++ ssl_config(SSL).
ssl_config(#{enable := false}) ->
[
@ -327,7 +331,7 @@ ssl_config(SSL = #{enable := true}) ->
[
{https_enabled, true},
{transport, ssl},
{transport_opts, maps:to_list(maps:remove(enable, SSL))}
{transport_opts, emqx_tls_lib:to_client_opts(SSL)}
].
username(#{username := Username}) ->
@ -645,10 +649,6 @@ desc_test_() ->
{desc, _, _},
desc(common)
),
?_assertMatch(
{desc, _, _},
desc(influxdb_udp)
),
?_assertMatch(
{desc, _, _},
desc(influxdb_api_v1)

View File

@ -0,0 +1,231 @@
%%--------------------------------------------------------------------
%% Copyright (c) 2023 EMQ Technologies Co., Ltd. All Rights Reserved.
%%--------------------------------------------------------------------
-module(emqx_ee_connector_influxdb_SUITE).
-compile(nowarn_export_all).
-compile(export_all).
-include("emqx_connector.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("common_test/include/ct.hrl").
-define(INFLUXDB_RESOURCE_MOD, emqx_ee_connector_influxdb).
all() ->
emqx_common_test_helpers:all(?MODULE).
groups() ->
[].
init_per_suite(Config) ->
InfluxDBTCPHost = os:getenv("INFLUXDB_APIV2_TCP_HOST", "toxiproxy"),
InfluxDBTCPPort = list_to_integer(os:getenv("INFLUXDB_APIV2_TCP_PORT", "8086")),
InfluxDBTLSHost = os:getenv("INFLUXDB_APIV2_TLS_HOST", "toxiproxy"),
InfluxDBTLSPort = list_to_integer(os:getenv("INFLUXDB_APIV2_TLS_PORT", "8087")),
Servers = [{InfluxDBTCPHost, InfluxDBTCPPort}, {InfluxDBTLSHost, InfluxDBTLSPort}],
case emqx_common_test_helpers:is_all_tcp_servers_available(Servers) of
true ->
ok = emqx_common_test_helpers:start_apps([emqx_conf]),
ok = emqx_connector_test_helpers:start_apps([emqx_resource]),
{ok, _} = application:ensure_all_started(emqx_connector),
[
{influxdb_tcp_host, InfluxDBTCPHost},
{influxdb_tcp_port, InfluxDBTCPPort},
{influxdb_tls_host, InfluxDBTLSHost},
{influxdb_tls_port, InfluxDBTLSPort}
| Config
];
false ->
case os:getenv("IS_CI") of
"yes" ->
throw(no_influxdb);
_ ->
{skip, no_influxdb}
end
end.
end_per_suite(_Config) ->
ok = emqx_common_test_helpers:stop_apps([emqx_conf]),
ok = emqx_connector_test_helpers:stop_apps([emqx_resource]),
_ = application:stop(emqx_connector).
init_per_testcase(_, Config) ->
Config.
end_per_testcase(_, _Config) ->
ok.
% %%------------------------------------------------------------------------------
% %% Testcases
% %%------------------------------------------------------------------------------
t_lifecycle(Config) ->
Host = ?config(influxdb_tcp_host, Config),
Port = ?config(influxdb_tcp_port, Config),
perform_lifecycle_check(
<<"emqx_ee_connector_influxdb_SUITE">>,
influxdb_config(Host, Port, false, <<"verify_none">>)
).
perform_lifecycle_check(PoolName, InitialConfig) ->
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?INFLUXDB_RESOURCE_MOD, InitialConfig),
% We need to add a write_syntax to the config since the connector
% expects this
FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()},
{ok, #{
state := #{client := #{pool := ReturnedPoolName}} = State,
status := InitialStatus
}} = emqx_resource:create_local(
PoolName,
?CONNECTOR_RESOURCE_GROUP,
?INFLUXDB_RESOURCE_MOD,
FullConfig,
#{}
),
?assertEqual(InitialStatus, connected),
% Instance should match the state and status of the just started resource
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
state := State,
status := InitialStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
% % Perform query as further check that the resource is working as expected
?assertMatch(ok, emqx_resource:query(PoolName, test_query())),
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists.
{ok, ?CONNECTOR_RESOURCE_GROUP, #{
state := State,
status := StoppedStatus
}} =
emqx_resource:get_instance(PoolName),
?assertEqual(stopped, StoppedStatus),
?assertEqual({error, resource_is_stopped}, emqx_resource:health_check(PoolName)),
% Resource healthcheck shortcuts things by checking ets. Go deeper by checking pool itself.
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
% Can call stop/1 again on an already stopped instance
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Make sure it can be restarted and the healthchecks and queries work properly
?assertEqual(ok, emqx_resource:restart(PoolName)),
% async restart, need to wait resource
timer:sleep(500),
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
?assertMatch(ok, emqx_resource:query(PoolName, test_query())),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
% Should not even be able to get the resource data out of ets now unlike just stopping.
?assertEqual({error, not_found}, emqx_resource:get_instance(PoolName)).
t_tls_verify_none(Config) ->
PoolName = <<"emqx_ee_connector_influxdb_SUITE">>,
Host = ?config(influxdb_tls_host, Config),
Port = ?config(influxdb_tls_port, Config),
InitialConfig = influxdb_config(Host, Port, true, <<"verify_none">>),
ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid),
?assertEqual(connected, ValidStatus),
InvalidStatus = perform_tls_opts_check(PoolName, InitialConfig, fail),
?assertEqual(disconnected, InvalidStatus),
ok.
t_tls_verify_peer(Config) ->
PoolName = <<"emqx_ee_connector_influxdb_SUITE">>,
Host = ?config(influxdb_tls_host, Config),
Port = ?config(influxdb_tls_port, Config),
InitialConfig = influxdb_config(Host, Port, true, <<"verify_peer">>),
%% This works without a CA-cert & friends since we are using a mock
ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid),
?assertEqual(connected, ValidStatus),
InvalidStatus = perform_tls_opts_check(PoolName, InitialConfig, fail),
?assertEqual(disconnected, InvalidStatus),
ok.
perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) ->
{ok, #{config := CheckedConfig}} =
emqx_resource:check_config(?INFLUXDB_RESOURCE_MOD, InitialConfig),
% Meck handling of TLS opt handling so that we can inject custom
% verification returns
meck:new(emqx_tls_lib, [passthrough, no_link]),
meck:expect(
emqx_tls_lib,
to_client_opts,
fun(Opts) ->
Verify = {verify_fun, {custom_verify(), {return, VerifyReturn}}},
[Verify | meck:passthrough([Opts])]
end
),
try
% We need to add a write_syntax to the config since the connector
% expects this
FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()},
{ok, #{
config := #{ssl := #{enable := SslEnabled}},
status := Status
}} = emqx_resource:create_local(
PoolName,
?CONNECTOR_RESOURCE_GROUP,
?INFLUXDB_RESOURCE_MOD,
FullConfig,
#{}
),
?assert(SslEnabled),
?assert(meck:validate(emqx_tls_lib)),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
Status
after
meck:unload(emqx_tls_lib)
end.
% %%------------------------------------------------------------------------------
% %% Helpers
% %%------------------------------------------------------------------------------
influxdb_config(Host, Port, SslEnabled, Verify) ->
Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])),
ResourceConfig = #{
<<"bucket">> => <<"mqtt">>,
<<"org">> => <<"emqx">>,
<<"token">> => <<"abcdefg">>,
<<"server">> => Server,
<<"ssl">> => #{
<<"enable">> => SslEnabled,
<<"verify">> => Verify
}
},
#{<<"config">> => ResourceConfig}.
custom_verify() ->
fun
(_, {bad_cert, unknown_ca} = Event, {return, Return} = UserState) ->
ct:pal("Call to custom verify fun. Event: ~p UserState: ~p", [Event, UserState]),
{Return, UserState};
(_, Event, UserState) ->
ct:pal("Unexpected call to custom verify fun. Event: ~p UserState: ~p", [
Event, UserState
]),
{fail, unexpected_call_to_verify_fun}
end.
influxdb_write_syntax() ->
[
#{
measurement => "${topic}",
tags => [{"clientid", "${clientid}"}],
fields => [{"payload", "${payload}"}],
timestamp => undefined
}
].
test_query() ->
{send_message, #{
<<"clientid">> => <<"something">>,
<<"payload">> => #{bool => true},
<<"topic">> => <<"connector_test">>
}}.