Compare commits
1 Commits
master
...
split_infl
Author | SHA1 | Date |
---|---|---|
![]() |
055541926d |
|
@ -165,7 +165,7 @@ roots() -> [].
|
||||||
|
|
||||||
fields("config_connector") ->
|
fields("config_connector") ->
|
||||||
emqx_connector_schema:common_fields() ++
|
emqx_connector_schema:common_fields() ++
|
||||||
connection_fields() ++
|
emqx_bridge_influxdb_connector:fields("connector") ++
|
||||||
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
|
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
|
||||||
fields("post_api_v1") ->
|
fields("post_api_v1") ->
|
||||||
method_fields(post, influxdb_api_v1);
|
method_fields(post, influxdb_api_v1);
|
||||||
|
@ -204,7 +204,7 @@ fields(Field) when
|
||||||
Field == "post_connector"
|
Field == "post_connector"
|
||||||
->
|
->
|
||||||
Fields =
|
Fields =
|
||||||
connection_fields() ++
|
emqx_bridge_influxdb_connector:fields("connector") ++
|
||||||
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
|
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
|
||||||
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
|
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
|
||||||
fields(Field) when
|
fields(Field) when
|
||||||
|
@ -219,19 +219,6 @@ fields(Type) when
|
||||||
influxdb_bridge_common_fields() ++
|
influxdb_bridge_common_fields() ++
|
||||||
connector_fields(Type).
|
connector_fields(Type).
|
||||||
|
|
||||||
connection_fields() ->
|
|
||||||
[
|
|
||||||
emqx_bridge_influxdb_connector:server_field(),
|
|
||||||
{parameters,
|
|
||||||
mk(
|
|
||||||
hoconsc:union([
|
|
||||||
ref(emqx_bridge_influxdb_connector, "connector_" ++ T)
|
|
||||||
|| T <- ["influxdb_api_v1", "influxdb_api_v2"]
|
|
||||||
]),
|
|
||||||
#{required => true, desc => ?DESC("influxdb_parameters")}
|
|
||||||
)}
|
|
||||||
] ++ emqx_connector_schema_lib:ssl_fields().
|
|
||||||
|
|
||||||
method_fields(post, ConnectorType) ->
|
method_fields(post, ConnectorType) ->
|
||||||
influxdb_bridge_common_fields() ++
|
influxdb_bridge_common_fields() ++
|
||||||
connector_fields(ConnectorType) ++
|
connector_fields(ConnectorType) ++
|
||||||
|
|
|
@ -40,7 +40,7 @@
|
||||||
|
|
||||||
-export([transform_bridge_v1_config_to_connector_config/1]).
|
-export([transform_bridge_v1_config_to_connector_config/1]).
|
||||||
|
|
||||||
-export([precision_field/0, server_field/0]).
|
-export([precision_field/0]).
|
||||||
|
|
||||||
%% only for test
|
%% only for test
|
||||||
-export([is_unrecoverable_error/1]).
|
-export([is_unrecoverable_error/1]).
|
||||||
|
@ -232,28 +232,30 @@ namespace() -> connector_influxdb.
|
||||||
roots() ->
|
roots() ->
|
||||||
[
|
[
|
||||||
{config, #{
|
{config, #{
|
||||||
type => hoconsc:union(
|
type => hoconsc:ref(?MODULE, "connector")
|
||||||
[
|
|
||||||
hoconsc:ref(?MODULE, influxdb_api_v1),
|
|
||||||
hoconsc:ref(?MODULE, influxdb_api_v2)
|
|
||||||
]
|
|
||||||
)
|
|
||||||
}}
|
}}
|
||||||
].
|
].
|
||||||
|
|
||||||
fields(common) ->
|
fields("connector") ->
|
||||||
[
|
[
|
||||||
server_field(),
|
server_field(),
|
||||||
precision_field()
|
parameter_field()
|
||||||
];
|
] ++ emqx_connector_schema_lib:ssl_fields();
|
||||||
fields("connector_influxdb_api_v1") ->
|
fields("connector_influxdb_api_v1") ->
|
||||||
[influxdb_type_field(influxdb_api_v1) | influxdb_api_v1_fields()];
|
[influxdb_type_field(influxdb_api_v1) | influxdb_api_v1_fields()];
|
||||||
fields("connector_influxdb_api_v2") ->
|
fields("connector_influxdb_api_v2") ->
|
||||||
[influxdb_type_field(influxdb_api_v2) | influxdb_api_v2_fields()];
|
[influxdb_type_field(influxdb_api_v2) | influxdb_api_v2_fields()];
|
||||||
|
%% ============ begin: schema for old bridge configs ============
|
||||||
fields(influxdb_api_v1) ->
|
fields(influxdb_api_v1) ->
|
||||||
fields(common) ++ influxdb_api_v1_fields() ++ emqx_connector_schema_lib:ssl_fields();
|
fields(common) ++ influxdb_api_v1_fields();
|
||||||
fields(influxdb_api_v2) ->
|
fields(influxdb_api_v2) ->
|
||||||
fields(common) ++ influxdb_api_v2_fields() ++ emqx_connector_schema_lib:ssl_fields().
|
fields(common) ++ influxdb_api_v2_fields();
|
||||||
|
fields(common) ->
|
||||||
|
[
|
||||||
|
server_field(),
|
||||||
|
precision_field()
|
||||||
|
] ++ emqx_connector_schema_lib:ssl_fields().
|
||||||
|
%% ============ end: schema for old bridge configs ============
|
||||||
|
|
||||||
influxdb_type_field(Type) ->
|
influxdb_type_field(Type) ->
|
||||||
{influxdb_type, #{
|
{influxdb_type, #{
|
||||||
|
@ -262,6 +264,7 @@ influxdb_type_field(Type) ->
|
||||||
default => Type,
|
default => Type,
|
||||||
desc => ?DESC(atom_to_list(Type))
|
desc => ?DESC(atom_to_list(Type))
|
||||||
}}.
|
}}.
|
||||||
|
|
||||||
server_field() ->
|
server_field() ->
|
||||||
{server, server()}.
|
{server, server()}.
|
||||||
|
|
||||||
|
@ -275,6 +278,16 @@ precision_field() ->
|
||||||
required => false, default => ms, desc => ?DESC("precision")
|
required => false, default => ms, desc => ?DESC("precision")
|
||||||
})}.
|
})}.
|
||||||
|
|
||||||
|
parameter_field() ->
|
||||||
|
{parameters,
|
||||||
|
mk(
|
||||||
|
hoconsc:union([
|
||||||
|
ref(?MODULE, "connector_" ++ T)
|
||||||
|
|| T <- ["influxdb_api_v1", "influxdb_api_v2"]
|
||||||
|
]),
|
||||||
|
#{required => true, desc => ?DESC("influxdb_parameters")}
|
||||||
|
)}.
|
||||||
|
|
||||||
influxdb_api_v1_fields() ->
|
influxdb_api_v1_fields() ->
|
||||||
[
|
[
|
||||||
{database, mk(binary(), #{required => true, desc => ?DESC("database")})},
|
{database, mk(binary(), #{required => true, desc => ?DESC("database")})},
|
||||||
|
@ -304,6 +317,8 @@ desc(influxdb_api_v1) ->
|
||||||
?DESC("influxdb_api_v1");
|
?DESC("influxdb_api_v1");
|
||||||
desc(influxdb_api_v2) ->
|
desc(influxdb_api_v2) ->
|
||||||
?DESC("influxdb_api_v2");
|
?DESC("influxdb_api_v2");
|
||||||
|
desc("connector") ->
|
||||||
|
?DESC("connector");
|
||||||
desc("connector_influxdb_api_v1") ->
|
desc("connector_influxdb_api_v1") ->
|
||||||
?DESC("influxdb_api_v1");
|
?DESC("influxdb_api_v1");
|
||||||
desc("connector_influxdb_api_v2") ->
|
desc("connector_influxdb_api_v2") ->
|
||||||
|
@ -411,8 +426,7 @@ client_config(
|
||||||
{host, str(Host)},
|
{host, str(Host)},
|
||||||
{port, Port},
|
{port, Port},
|
||||||
{pool_size, erlang:system_info(schedulers)},
|
{pool_size, erlang:system_info(schedulers)},
|
||||||
{pool, InstId},
|
{pool, InstId}
|
||||||
{precision, atom_to_binary(maps:get(precision, Config, ms), utf8)}
|
|
||||||
] ++ protocol_config(Config).
|
] ++ protocol_config(Config).
|
||||||
|
|
||||||
%% api v1 config
|
%% api v1 config
|
||||||
|
|
|
@ -943,6 +943,7 @@ t_create_disconnected(Config) ->
|
||||||
?of_kind(influxdb_connector_start_failed, Trace),
|
?of_kind(influxdb_connector_start_failed, Trace),
|
||||||
case Reason of
|
case Reason of
|
||||||
econnrefused -> ok;
|
econnrefused -> ok;
|
||||||
|
closed -> ok;
|
||||||
{closed, _} -> ok;
|
{closed, _} -> ok;
|
||||||
_ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason])
|
_ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason])
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -66,7 +66,7 @@ t_lifecycle(Config) ->
|
||||||
Port = ?config(influxdb_tcp_port, Config),
|
Port = ?config(influxdb_tcp_port, Config),
|
||||||
perform_lifecycle_check(
|
perform_lifecycle_check(
|
||||||
<<"emqx_bridge_influxdb_connector_SUITE">>,
|
<<"emqx_bridge_influxdb_connector_SUITE">>,
|
||||||
influxdb_config(Host, Port, false, <<"verify_none">>)
|
influxdb_connector_config(Host, Port, false, <<"verify_none">>)
|
||||||
).
|
).
|
||||||
|
|
||||||
perform_lifecycle_check(PoolName, InitialConfig) ->
|
perform_lifecycle_check(PoolName, InitialConfig) ->
|
||||||
|
@ -76,6 +76,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
|
||||||
% expects this
|
% expects this
|
||||||
FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()},
|
FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()},
|
||||||
{ok, #{
|
{ok, #{
|
||||||
|
id := ResourceId,
|
||||||
state := #{client := #{pool := ReturnedPoolName}} = State,
|
state := #{client := #{pool := ReturnedPoolName}} = State,
|
||||||
status := InitialStatus
|
status := InitialStatus
|
||||||
}} = emqx_resource:create_local(
|
}} = emqx_resource:create_local(
|
||||||
|
@ -93,8 +94,18 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
|
||||||
}} =
|
}} =
|
||||||
emqx_resource:get_instance(PoolName),
|
emqx_resource:get_instance(PoolName),
|
||||||
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
|
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
|
||||||
|
%% install actions to the connector
|
||||||
|
ActionConfig = influxdb_action_config(),
|
||||||
|
ChannelId = <<"test_channel">>,
|
||||||
|
?assertEqual(
|
||||||
|
ok,
|
||||||
|
emqx_resource_manager:add_channel(
|
||||||
|
ResourceId, ChannelId, ActionConfig
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)),
|
||||||
% % Perform query as further check that the resource is working as expected
|
% % Perform query as further check that the resource is working as expected
|
||||||
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())),
|
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query(ChannelId))),
|
||||||
?assertEqual(ok, emqx_resource:stop(PoolName)),
|
?assertEqual(ok, emqx_resource:stop(PoolName)),
|
||||||
% Resource will be listed still, but state will be changed and healthcheck will fail
|
% Resource will be listed still, but state will be changed and healthcheck will fail
|
||||||
% as the worker no longer exists.
|
% as the worker no longer exists.
|
||||||
|
@ -116,7 +127,15 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
|
||||||
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
|
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
|
||||||
emqx_resource:get_instance(PoolName),
|
emqx_resource:get_instance(PoolName),
|
||||||
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
|
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
|
||||||
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())),
|
ChannelId = <<"test_channel">>,
|
||||||
|
?assertEqual(
|
||||||
|
ok,
|
||||||
|
emqx_resource_manager:add_channel(
|
||||||
|
ResourceId, ChannelId, ActionConfig
|
||||||
|
)
|
||||||
|
),
|
||||||
|
?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)),
|
||||||
|
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query(ChannelId))),
|
||||||
% Stop and remove the resource in one go.
|
% Stop and remove the resource in one go.
|
||||||
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
|
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
|
||||||
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
|
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
|
||||||
|
@ -127,7 +146,7 @@ t_tls_verify_none(Config) ->
|
||||||
PoolName = <<"testpool-1">>,
|
PoolName = <<"testpool-1">>,
|
||||||
Host = ?config(influxdb_tls_host, Config),
|
Host = ?config(influxdb_tls_host, Config),
|
||||||
Port = ?config(influxdb_tls_port, Config),
|
Port = ?config(influxdb_tls_port, Config),
|
||||||
InitialConfig = influxdb_config(Host, Port, true, <<"verify_none">>),
|
InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_none">>),
|
||||||
ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid),
|
ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid),
|
||||||
?assertEqual(connected, ValidStatus),
|
?assertEqual(connected, ValidStatus),
|
||||||
InvalidStatus = perform_tls_opts_check(PoolName, InitialConfig, fail),
|
InvalidStatus = perform_tls_opts_check(PoolName, InitialConfig, fail),
|
||||||
|
@ -138,7 +157,7 @@ t_tls_verify_peer(Config) ->
|
||||||
PoolName = <<"testpool-2">>,
|
PoolName = <<"testpool-2">>,
|
||||||
Host = ?config(influxdb_tls_host, Config),
|
Host = ?config(influxdb_tls_host, Config),
|
||||||
Port = ?config(influxdb_tls_port, Config),
|
Port = ?config(influxdb_tls_port, Config),
|
||||||
InitialConfig = influxdb_config(Host, Port, true, <<"verify_peer">>),
|
InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_peer">>),
|
||||||
%% This works without a CA-cert & friends since we are using a mock
|
%% This works without a CA-cert & friends since we are using a mock
|
||||||
ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid),
|
ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid),
|
||||||
?assertEqual(connected, ValidStatus),
|
?assertEqual(connected, ValidStatus),
|
||||||
|
@ -191,19 +210,30 @@ perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) ->
|
||||||
% %% Helpers
|
% %% Helpers
|
||||||
% %%------------------------------------------------------------------------------
|
% %%------------------------------------------------------------------------------
|
||||||
|
|
||||||
influxdb_config(Host, Port, SslEnabled, Verify) ->
|
influxdb_connector_config(Host, Port, SslEnabled, Verify) ->
|
||||||
Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])),
|
Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])),
|
||||||
ResourceConfig = #{
|
ConnectorConf = #{
|
||||||
<<"bucket">> => <<"mqtt">>,
|
<<"parameters">> => #{
|
||||||
<<"org">> => <<"emqx">>,
|
<<"influxdb_type">> => <<"influxdb_api_v2">>,
|
||||||
<<"token">> => <<"abcdefg">>,
|
<<"bucket">> => <<"mqtt">>,
|
||||||
|
<<"org">> => <<"emqx">>,
|
||||||
|
<<"token">> => <<"abcdefg">>
|
||||||
|
},
|
||||||
<<"server">> => Server,
|
<<"server">> => Server,
|
||||||
<<"ssl">> => #{
|
<<"ssl">> => #{
|
||||||
<<"enable">> => SslEnabled,
|
<<"enable">> => SslEnabled,
|
||||||
<<"verify">> => Verify
|
<<"verify">> => Verify
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
#{<<"config">> => ResourceConfig}.
|
#{<<"config">> => ConnectorConf}.
|
||||||
|
|
||||||
|
influxdb_action_config() ->
|
||||||
|
#{
|
||||||
|
parameters => #{
|
||||||
|
write_syntax => influxdb_write_syntax(),
|
||||||
|
precision => ms
|
||||||
|
}
|
||||||
|
}.
|
||||||
|
|
||||||
custom_verify() ->
|
custom_verify() ->
|
||||||
fun
|
fun
|
||||||
|
@ -227,8 +257,8 @@ influxdb_write_syntax() ->
|
||||||
}
|
}
|
||||||
].
|
].
|
||||||
|
|
||||||
test_query() ->
|
test_query(ChannelId) ->
|
||||||
{send_message, #{
|
{ChannelId, #{
|
||||||
<<"clientid">> => <<"something">>,
|
<<"clientid">> => <<"something">>,
|
||||||
<<"payload">> => #{bool => true},
|
<<"payload">> => #{bool => true},
|
||||||
<<"topic">> => <<"connector_test">>,
|
<<"topic">> => <<"connector_test">>,
|
||||||
|
|
|
@ -52,6 +52,11 @@ action_parameters.label:
|
||||||
action_parameters.desc:
|
action_parameters.desc:
|
||||||
"""Additional parameters specific to this action type"""
|
"""Additional parameters specific to this action type"""
|
||||||
|
|
||||||
|
connector.label:
|
||||||
|
"""InfluxDB Connector"""
|
||||||
|
connector.desc:
|
||||||
|
"""InfluxDB Connector Configs"""
|
||||||
|
|
||||||
influxdb_action.label:
|
influxdb_action.label:
|
||||||
"""InfluxDB Action"""
|
"""InfluxDB Action"""
|
||||||
influxdb_action.desc:
|
influxdb_action.desc:
|
||||||
|
|
Loading…
Reference in New Issue