fix: fix: update more influxdb testcases

This commit is contained in:
Shawn 2024-01-04 17:48:23 +08:00
parent 8ebd233a46
commit 055541926d
5 changed files with 79 additions and 42 deletions

View File

@ -165,7 +165,7 @@ roots() -> [].
fields("config_connector") -> fields("config_connector") ->
emqx_connector_schema:common_fields() ++ emqx_connector_schema:common_fields() ++
connection_fields() ++ emqx_bridge_influxdb_connector:fields("connector") ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields("post_api_v1") -> fields("post_api_v1") ->
method_fields(post, influxdb_api_v1); method_fields(post, influxdb_api_v1);
@ -204,7 +204,7 @@ fields(Field) when
Field == "post_connector" Field == "post_connector"
-> ->
Fields = Fields =
connection_fields() ++ emqx_bridge_influxdb_connector:fields("connector") ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
fields(Field) when fields(Field) when
@ -219,19 +219,6 @@ fields(Type) when
influxdb_bridge_common_fields() ++ influxdb_bridge_common_fields() ++
connector_fields(Type). connector_fields(Type).
connection_fields() ->
[
emqx_bridge_influxdb_connector:server_field(),
{parameters,
mk(
hoconsc:union([
ref(emqx_bridge_influxdb_connector, "connector_" ++ T)
|| T <- ["influxdb_api_v1", "influxdb_api_v2"]
]),
#{required => true, desc => ?DESC("influxdb_parameters")}
)}
] ++ emqx_connector_schema_lib:ssl_fields().
method_fields(post, ConnectorType) -> method_fields(post, ConnectorType) ->
influxdb_bridge_common_fields() ++ influxdb_bridge_common_fields() ++
connector_fields(ConnectorType) ++ connector_fields(ConnectorType) ++

View File

@ -40,7 +40,7 @@
-export([transform_bridge_v1_config_to_connector_config/1]). -export([transform_bridge_v1_config_to_connector_config/1]).
-export([precision_field/0, server_field/0]). -export([precision_field/0]).
%% only for test %% only for test
-export([is_unrecoverable_error/1]). -export([is_unrecoverable_error/1]).
@ -232,28 +232,30 @@ namespace() -> connector_influxdb.
roots() -> roots() ->
[ [
{config, #{ {config, #{
type => hoconsc:union( type => hoconsc:ref(?MODULE, "connector")
[
hoconsc:ref(?MODULE, influxdb_api_v1),
hoconsc:ref(?MODULE, influxdb_api_v2)
]
)
}} }}
]. ].
fields(common) -> fields("connector") ->
[ [
server_field(), server_field(),
precision_field() parameter_field()
]; ] ++ emqx_connector_schema_lib:ssl_fields();
fields("connector_influxdb_api_v1") -> fields("connector_influxdb_api_v1") ->
[influxdb_type_field(influxdb_api_v1) | influxdb_api_v1_fields()]; [influxdb_type_field(influxdb_api_v1) | influxdb_api_v1_fields()];
fields("connector_influxdb_api_v2") -> fields("connector_influxdb_api_v2") ->
[influxdb_type_field(influxdb_api_v2) | influxdb_api_v2_fields()]; [influxdb_type_field(influxdb_api_v2) | influxdb_api_v2_fields()];
%% ============ begin: schema for old bridge configs ============
fields(influxdb_api_v1) -> fields(influxdb_api_v1) ->
fields(common) ++ influxdb_api_v1_fields() ++ emqx_connector_schema_lib:ssl_fields(); fields(common) ++ influxdb_api_v1_fields();
fields(influxdb_api_v2) -> fields(influxdb_api_v2) ->
fields(common) ++ influxdb_api_v2_fields() ++ emqx_connector_schema_lib:ssl_fields(). fields(common) ++ influxdb_api_v2_fields();
fields(common) ->
[
server_field(),
precision_field()
] ++ emqx_connector_schema_lib:ssl_fields().
%% ============ end: schema for old bridge configs ============
influxdb_type_field(Type) -> influxdb_type_field(Type) ->
{influxdb_type, #{ {influxdb_type, #{
@ -262,6 +264,7 @@ influxdb_type_field(Type) ->
default => Type, default => Type,
desc => ?DESC(atom_to_list(Type)) desc => ?DESC(atom_to_list(Type))
}}. }}.
server_field() -> server_field() ->
{server, server()}. {server, server()}.
@ -275,6 +278,16 @@ precision_field() ->
required => false, default => ms, desc => ?DESC("precision") required => false, default => ms, desc => ?DESC("precision")
})}. })}.
parameter_field() ->
{parameters,
mk(
hoconsc:union([
ref(?MODULE, "connector_" ++ T)
|| T <- ["influxdb_api_v1", "influxdb_api_v2"]
]),
#{required => true, desc => ?DESC("influxdb_parameters")}
)}.
influxdb_api_v1_fields() -> influxdb_api_v1_fields() ->
[ [
{database, mk(binary(), #{required => true, desc => ?DESC("database")})}, {database, mk(binary(), #{required => true, desc => ?DESC("database")})},
@ -304,6 +317,8 @@ desc(influxdb_api_v1) ->
?DESC("influxdb_api_v1"); ?DESC("influxdb_api_v1");
desc(influxdb_api_v2) -> desc(influxdb_api_v2) ->
?DESC("influxdb_api_v2"); ?DESC("influxdb_api_v2");
desc("connector") ->
?DESC("connector");
desc("connector_influxdb_api_v1") -> desc("connector_influxdb_api_v1") ->
?DESC("influxdb_api_v1"); ?DESC("influxdb_api_v1");
desc("connector_influxdb_api_v2") -> desc("connector_influxdb_api_v2") ->
@ -411,8 +426,7 @@ client_config(
{host, str(Host)}, {host, str(Host)},
{port, Port}, {port, Port},
{pool_size, erlang:system_info(schedulers)}, {pool_size, erlang:system_info(schedulers)},
{pool, InstId}, {pool, InstId}
{precision, atom_to_binary(maps:get(precision, Config, ms), utf8)}
] ++ protocol_config(Config). ] ++ protocol_config(Config).
%% api v1 config %% api v1 config

View File

@ -943,6 +943,7 @@ t_create_disconnected(Config) ->
?of_kind(influxdb_connector_start_failed, Trace), ?of_kind(influxdb_connector_start_failed, Trace),
case Reason of case Reason of
econnrefused -> ok; econnrefused -> ok;
closed -> ok;
{closed, _} -> ok; {closed, _} -> ok;
_ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason]) _ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason])
end, end,

View File

@ -66,7 +66,7 @@ t_lifecycle(Config) ->
Port = ?config(influxdb_tcp_port, Config), Port = ?config(influxdb_tcp_port, Config),
perform_lifecycle_check( perform_lifecycle_check(
<<"emqx_bridge_influxdb_connector_SUITE">>, <<"emqx_bridge_influxdb_connector_SUITE">>,
influxdb_config(Host, Port, false, <<"verify_none">>) influxdb_connector_config(Host, Port, false, <<"verify_none">>)
). ).
perform_lifecycle_check(PoolName, InitialConfig) -> perform_lifecycle_check(PoolName, InitialConfig) ->
@ -76,6 +76,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
% expects this % expects this
FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()}, FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()},
{ok, #{ {ok, #{
id := ResourceId,
state := #{client := #{pool := ReturnedPoolName}} = State, state := #{client := #{pool := ReturnedPoolName}} = State,
status := InitialStatus status := InitialStatus
}} = emqx_resource:create_local( }} = emqx_resource:create_local(
@ -93,8 +94,18 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
}} = }} =
emqx_resource:get_instance(PoolName), emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
%% install actions to the connector
ActionConfig = influxdb_action_config(),
ChannelId = <<"test_channel">>,
?assertEqual(
ok,
emqx_resource_manager:add_channel(
ResourceId, ChannelId, ActionConfig
)
),
?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)),
% % Perform query as further check that the resource is working as expected % % Perform query as further check that the resource is working as expected
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())), ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query(ChannelId))),
?assertEqual(ok, emqx_resource:stop(PoolName)), ?assertEqual(ok, emqx_resource:stop(PoolName)),
% Resource will be listed still, but state will be changed and healthcheck will fail % Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists. % as the worker no longer exists.
@ -116,7 +127,15 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName), emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())), ChannelId = <<"test_channel">>,
?assertEqual(
ok,
emqx_resource_manager:add_channel(
ResourceId, ChannelId, ActionConfig
)
),
?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)),
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query(ChannelId))),
% Stop and remove the resource in one go. % Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)), ?assertEqual(ok, emqx_resource:remove_local(PoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
@ -127,7 +146,7 @@ t_tls_verify_none(Config) ->
PoolName = <<"testpool-1">>, PoolName = <<"testpool-1">>,
Host = ?config(influxdb_tls_host, Config), Host = ?config(influxdb_tls_host, Config),
Port = ?config(influxdb_tls_port, Config), Port = ?config(influxdb_tls_port, Config),
InitialConfig = influxdb_config(Host, Port, true, <<"verify_none">>), InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_none">>),
ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid), ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid),
?assertEqual(connected, ValidStatus), ?assertEqual(connected, ValidStatus),
InvalidStatus = perform_tls_opts_check(PoolName, InitialConfig, fail), InvalidStatus = perform_tls_opts_check(PoolName, InitialConfig, fail),
@ -138,7 +157,7 @@ t_tls_verify_peer(Config) ->
PoolName = <<"testpool-2">>, PoolName = <<"testpool-2">>,
Host = ?config(influxdb_tls_host, Config), Host = ?config(influxdb_tls_host, Config),
Port = ?config(influxdb_tls_port, Config), Port = ?config(influxdb_tls_port, Config),
InitialConfig = influxdb_config(Host, Port, true, <<"verify_peer">>), InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_peer">>),
%% This works without a CA-cert & friends since we are using a mock %% This works without a CA-cert & friends since we are using a mock
ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid), ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid),
?assertEqual(connected, ValidStatus), ?assertEqual(connected, ValidStatus),
@ -191,19 +210,30 @@ perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) ->
% %% Helpers % %% Helpers
% %%------------------------------------------------------------------------------ % %%------------------------------------------------------------------------------
influxdb_config(Host, Port, SslEnabled, Verify) -> influxdb_connector_config(Host, Port, SslEnabled, Verify) ->
Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])), Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])),
ResourceConfig = #{ ConnectorConf = #{
<<"bucket">> => <<"mqtt">>, <<"parameters">> => #{
<<"org">> => <<"emqx">>, <<"influxdb_type">> => <<"influxdb_api_v2">>,
<<"token">> => <<"abcdefg">>, <<"bucket">> => <<"mqtt">>,
<<"org">> => <<"emqx">>,
<<"token">> => <<"abcdefg">>
},
<<"server">> => Server, <<"server">> => Server,
<<"ssl">> => #{ <<"ssl">> => #{
<<"enable">> => SslEnabled, <<"enable">> => SslEnabled,
<<"verify">> => Verify <<"verify">> => Verify
} }
}, },
#{<<"config">> => ResourceConfig}. #{<<"config">> => ConnectorConf}.
influxdb_action_config() ->
#{
parameters => #{
write_syntax => influxdb_write_syntax(),
precision => ms
}
}.
custom_verify() -> custom_verify() ->
fun fun
@ -227,8 +257,8 @@ influxdb_write_syntax() ->
} }
]. ].
test_query() -> test_query(ChannelId) ->
{send_message, #{ {ChannelId, #{
<<"clientid">> => <<"something">>, <<"clientid">> => <<"something">>,
<<"payload">> => #{bool => true}, <<"payload">> => #{bool => true},
<<"topic">> => <<"connector_test">>, <<"topic">> => <<"connector_test">>,

View File

@ -52,6 +52,11 @@ action_parameters.label:
action_parameters.desc: action_parameters.desc:
"""Additional parameters specific to this action type""" """Additional parameters specific to this action type"""
connector.label:
"""InfluxDB Connector"""
connector.desc:
"""InfluxDB Connector Configs"""
influxdb_action.label: influxdb_action.label:
"""InfluxDB Action""" """InfluxDB Action"""
influxdb_action.desc: influxdb_action.desc: