Compare commits
1 Commits
master
...
split_infl
Author | SHA1 | Date |
---|---|---|
![]() |
055541926d |
|
@ -165,7 +165,7 @@ roots() -> [].
|
|||
|
||||
fields("config_connector") ->
|
||||
emqx_connector_schema:common_fields() ++
|
||||
connection_fields() ++
|
||||
emqx_bridge_influxdb_connector:fields("connector") ++
|
||||
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
|
||||
fields("post_api_v1") ->
|
||||
method_fields(post, influxdb_api_v1);
|
||||
|
@ -204,7 +204,7 @@ fields(Field) when
|
|||
Field == "post_connector"
|
||||
->
|
||||
Fields =
|
||||
connection_fields() ++
|
||||
emqx_bridge_influxdb_connector:fields("connector") ++
|
||||
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
|
||||
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
|
||||
fields(Field) when
|
||||
|
@ -219,19 +219,6 @@ fields(Type) when
|
|||
influxdb_bridge_common_fields() ++
|
||||
connector_fields(Type).
|
||||
|
||||
connection_fields() ->
|
||||
[
|
||||
emqx_bridge_influxdb_connector:server_field(),
|
||||
{parameters,
|
||||
mk(
|
||||
hoconsc:union([
|
||||
ref(emqx_bridge_influxdb_connector, "connector_" ++ T)
|
||||
|| T <- ["influxdb_api_v1", "influxdb_api_v2"]
|
||||
]),
|
||||
#{required => true, desc => ?DESC("influxdb_parameters")}
|
||||
)}
|
||||
] ++ emqx_connector_schema_lib:ssl_fields().
|
||||
|
||||
method_fields(post, ConnectorType) ->
|
||||
influxdb_bridge_common_fields() ++
|
||||
connector_fields(ConnectorType) ++
|
||||
|
|
|
@ -40,7 +40,7 @@
|
|||
|
||||
-export([transform_bridge_v1_config_to_connector_config/1]).
|
||||
|
||||
-export([precision_field/0, server_field/0]).
|
||||
-export([precision_field/0]).
|
||||
|
||||
%% only for test
|
||||
-export([is_unrecoverable_error/1]).
|
||||
|
@ -232,28 +232,30 @@ namespace() -> connector_influxdb.
|
|||
roots() ->
|
||||
[
|
||||
{config, #{
|
||||
type => hoconsc:union(
|
||||
[
|
||||
hoconsc:ref(?MODULE, influxdb_api_v1),
|
||||
hoconsc:ref(?MODULE, influxdb_api_v2)
|
||||
]
|
||||
)
|
||||
type => hoconsc:ref(?MODULE, "connector")
|
||||
}}
|
||||
].
|
||||
|
||||
fields(common) ->
|
||||
fields("connector") ->
|
||||
[
|
||||
server_field(),
|
||||
precision_field()
|
||||
];
|
||||
parameter_field()
|
||||
] ++ emqx_connector_schema_lib:ssl_fields();
|
||||
fields("connector_influxdb_api_v1") ->
|
||||
[influxdb_type_field(influxdb_api_v1) | influxdb_api_v1_fields()];
|
||||
fields("connector_influxdb_api_v2") ->
|
||||
[influxdb_type_field(influxdb_api_v2) | influxdb_api_v2_fields()];
|
||||
%% ============ begin: schema for old bridge configs ============
|
||||
fields(influxdb_api_v1) ->
|
||||
fields(common) ++ influxdb_api_v1_fields() ++ emqx_connector_schema_lib:ssl_fields();
|
||||
fields(common) ++ influxdb_api_v1_fields();
|
||||
fields(influxdb_api_v2) ->
|
||||
fields(common) ++ influxdb_api_v2_fields() ++ emqx_connector_schema_lib:ssl_fields().
|
||||
fields(common) ++ influxdb_api_v2_fields();
|
||||
fields(common) ->
|
||||
[
|
||||
server_field(),
|
||||
precision_field()
|
||||
] ++ emqx_connector_schema_lib:ssl_fields().
|
||||
%% ============ end: schema for old bridge configs ============
|
||||
|
||||
influxdb_type_field(Type) ->
|
||||
{influxdb_type, #{
|
||||
|
@ -262,6 +264,7 @@ influxdb_type_field(Type) ->
|
|||
default => Type,
|
||||
desc => ?DESC(atom_to_list(Type))
|
||||
}}.
|
||||
|
||||
server_field() ->
|
||||
{server, server()}.
|
||||
|
||||
|
@ -275,6 +278,16 @@ precision_field() ->
|
|||
required => false, default => ms, desc => ?DESC("precision")
|
||||
})}.
|
||||
|
||||
parameter_field() ->
|
||||
{parameters,
|
||||
mk(
|
||||
hoconsc:union([
|
||||
ref(?MODULE, "connector_" ++ T)
|
||||
|| T <- ["influxdb_api_v1", "influxdb_api_v2"]
|
||||
]),
|
||||
#{required => true, desc => ?DESC("influxdb_parameters")}
|
||||
)}.
|
||||
|
||||
influxdb_api_v1_fields() ->
|
||||
[
|
||||
{database, mk(binary(), #{required => true, desc => ?DESC("database")})},
|
||||
|
@ -304,6 +317,8 @@ desc(influxdb_api_v1) ->
|
|||
?DESC("influxdb_api_v1");
|
||||
desc(influxdb_api_v2) ->
|
||||
?DESC("influxdb_api_v2");
|
||||
desc("connector") ->
|
||||
?DESC("connector");
|
||||
desc("connector_influxdb_api_v1") ->
|
||||
?DESC("influxdb_api_v1");
|
||||
desc("connector_influxdb_api_v2") ->
|
||||
|
@ -411,8 +426,7 @@ client_config(
|
|||
{host, str(Host)},
|
||||
{port, Port},
|
||||
{pool_size, erlang:system_info(schedulers)},
|
||||
{pool, InstId},
|
||||
{precision, atom_to_binary(maps:get(precision, Config, ms), utf8)}
|
||||
{pool, InstId}
|
||||
] ++ protocol_config(Config).
|
||||
|
||||
%% api v1 config
|
||||
|
|
|
@ -943,6 +943,7 @@ t_create_disconnected(Config) ->
|
|||
?of_kind(influxdb_connector_start_failed, Trace),
|
||||
case Reason of
|
||||
econnrefused -> ok;
|
||||
closed -> ok;
|
||||
{closed, _} -> ok;
|
||||
_ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason])
|
||||
end,
|
||||
|
|
|
@ -66,7 +66,7 @@ t_lifecycle(Config) ->
|
|||
Port = ?config(influxdb_tcp_port, Config),
|
||||
perform_lifecycle_check(
|
||||
<<"emqx_bridge_influxdb_connector_SUITE">>,
|
||||
influxdb_config(Host, Port, false, <<"verify_none">>)
|
||||
influxdb_connector_config(Host, Port, false, <<"verify_none">>)
|
||||
).
|
||||
|
||||
perform_lifecycle_check(PoolName, InitialConfig) ->
|
||||
|
@ -76,6 +76,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
|
|||
% expects this
|
||||
FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()},
|
||||
{ok, #{
|
||||
id := ResourceId,
|
||||
state := #{client := #{pool := ReturnedPoolName}} = State,
|
||||
status := InitialStatus
|
||||
}} = emqx_resource:create_local(
|
||||
|
@ -93,8 +94,18 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
|
|||
}} =
|
||||
emqx_resource:get_instance(PoolName),
|
||||
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
|
||||
%% install actions to the connector
|
||||
ActionConfig = influxdb_action_config(),
|
||||
ChannelId = <<"test_channel">>,
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_resource_manager:add_channel(
|
||||
ResourceId, ChannelId, ActionConfig
|
||||
)
|
||||
),
|
||||
?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)),
|
||||
% % Perform query as further check that the resource is working as expected
|
||||
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())),
|
||||
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query(ChannelId))),
|
||||
?assertEqual(ok, emqx_resource:stop(PoolName)),
|
||||
% Resource will be listed still, but state will be changed and healthcheck will fail
|
||||
% as the worker no longer exists.
|
||||
|
@ -116,7 +127,15 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
|
|||
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
|
||||
emqx_resource:get_instance(PoolName),
|
||||
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
|
||||
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())),
|
||||
ChannelId = <<"test_channel">>,
|
||||
?assertEqual(
|
||||
ok,
|
||||
emqx_resource_manager:add_channel(
|
||||
ResourceId, ChannelId, ActionConfig
|
||||
)
|
||||
),
|
||||
?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)),
|
||||
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query(ChannelId))),
|
||||
% Stop and remove the resource in one go.
|
||||
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
|
||||
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
|
||||
|
@ -127,7 +146,7 @@ t_tls_verify_none(Config) ->
|
|||
PoolName = <<"testpool-1">>,
|
||||
Host = ?config(influxdb_tls_host, Config),
|
||||
Port = ?config(influxdb_tls_port, Config),
|
||||
InitialConfig = influxdb_config(Host, Port, true, <<"verify_none">>),
|
||||
InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_none">>),
|
||||
ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid),
|
||||
?assertEqual(connected, ValidStatus),
|
||||
InvalidStatus = perform_tls_opts_check(PoolName, InitialConfig, fail),
|
||||
|
@ -138,7 +157,7 @@ t_tls_verify_peer(Config) ->
|
|||
PoolName = <<"testpool-2">>,
|
||||
Host = ?config(influxdb_tls_host, Config),
|
||||
Port = ?config(influxdb_tls_port, Config),
|
||||
InitialConfig = influxdb_config(Host, Port, true, <<"verify_peer">>),
|
||||
InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_peer">>),
|
||||
%% This works without a CA-cert & friends since we are using a mock
|
||||
ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid),
|
||||
?assertEqual(connected, ValidStatus),
|
||||
|
@ -191,19 +210,30 @@ perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) ->
|
|||
% %% Helpers
|
||||
% %%------------------------------------------------------------------------------
|
||||
|
||||
influxdb_config(Host, Port, SslEnabled, Verify) ->
|
||||
influxdb_connector_config(Host, Port, SslEnabled, Verify) ->
|
||||
Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])),
|
||||
ResourceConfig = #{
|
||||
ConnectorConf = #{
|
||||
<<"parameters">> => #{
|
||||
<<"influxdb_type">> => <<"influxdb_api_v2">>,
|
||||
<<"bucket">> => <<"mqtt">>,
|
||||
<<"org">> => <<"emqx">>,
|
||||
<<"token">> => <<"abcdefg">>,
|
||||
<<"token">> => <<"abcdefg">>
|
||||
},
|
||||
<<"server">> => Server,
|
||||
<<"ssl">> => #{
|
||||
<<"enable">> => SslEnabled,
|
||||
<<"verify">> => Verify
|
||||
}
|
||||
},
|
||||
#{<<"config">> => ResourceConfig}.
|
||||
#{<<"config">> => ConnectorConf}.
|
||||
|
||||
influxdb_action_config() ->
|
||||
#{
|
||||
parameters => #{
|
||||
write_syntax => influxdb_write_syntax(),
|
||||
precision => ms
|
||||
}
|
||||
}.
|
||||
|
||||
custom_verify() ->
|
||||
fun
|
||||
|
@ -227,8 +257,8 @@ influxdb_write_syntax() ->
|
|||
}
|
||||
].
|
||||
|
||||
test_query() ->
|
||||
{send_message, #{
|
||||
test_query(ChannelId) ->
|
||||
{ChannelId, #{
|
||||
<<"clientid">> => <<"something">>,
|
||||
<<"payload">> => #{bool => true},
|
||||
<<"topic">> => <<"connector_test">>,
|
||||
|
|
|
@ -52,6 +52,11 @@ action_parameters.label:
|
|||
action_parameters.desc:
|
||||
"""Additional parameters specific to this action type"""
|
||||
|
||||
connector.label:
|
||||
"""InfluxDB Connector"""
|
||||
connector.desc:
|
||||
"""InfluxDB Connector Configs"""
|
||||
|
||||
influxdb_action.label:
|
||||
"""InfluxDB Action"""
|
||||
influxdb_action.desc:
|
||||
|
|
Loading…
Reference in New Issue