Merge pull request #12247 from emqx/split_influxdb_connector_actions

refactor: split influxdb bridges to actions and connectors
This commit is contained in:
Xinyu Liu 2024-01-05 14:07:35 +08:00 committed by GitHub
commit da3937b80e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 465 additions and 114 deletions

View File

@ -80,6 +80,7 @@ hard_coded_action_info_modules_ee() ->
emqx_bridge_kafka_action_info,
emqx_bridge_matrix_action_info,
emqx_bridge_mongodb_action_info,
emqx_bridge_influxdb_action_info,
emqx_bridge_mysql_action_info,
emqx_bridge_pgsql_action_info,
emqx_bridge_syskeeper_action_info,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_bridge, [
{description, "EMQX bridges"},
{vsn, "0.1.32"},
{vsn, "0.1.33"},
{registered, [emqx_bridge_sup]},
{mod, {emqx_bridge_app, []}},
{applications, [

View File

@ -3,7 +3,7 @@
{erl_opts, [debug_info]}.
{deps, [
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.11"}}},
{influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.12"}}},
{emqx_connector, {path, "../../apps/emqx_connector"}},
{emqx_resource, {path, "../../apps/emqx_resource"}},
{emqx_bridge, {path, "../../apps/emqx_bridge"}}

View File

@ -1,6 +1,6 @@
{application, emqx_bridge_influxdb, [
{description, "EMQX Enterprise InfluxDB Bridge"},
{vsn, "0.1.7"},
{vsn, "0.2.0"},
{registered, []},
{applications, [
kernel,
@ -8,7 +8,7 @@
emqx_resource,
influxdb
]},
{env, []},
{env, [{emqx_action_info_modules, [emqx_bridge_influxdb_action_info]}]},
{modules, []},
{links, []}
]}.

View File

@ -11,7 +11,6 @@
-import(hoconsc, [mk/2, enum/1, ref/2]).
-export([
conn_bridge_examples/1,
write_syntax_type/0
]).
@ -22,17 +21,28 @@
desc/1
]).
%% Examples
-export([
bridge_v2_examples/1,
conn_bridge_examples/1,
connector_examples/1
]).
-type write_syntax() :: list().
-reflect_type([write_syntax/0]).
-typerefl_from_string({write_syntax/0, ?MODULE, to_influx_lines}).
-export([to_influx_lines/1]).
-define(CONNECTOR_TYPE, influxdb).
-define(ACTION_TYPE, influxdb).
%% -------------------------------------------------------------------------------------------------
%% api
write_syntax_type() ->
typerefl:alias("string", write_syntax()).
%% Examples
conn_bridge_examples(Method) ->
[
#{
@ -49,25 +59,80 @@ conn_bridge_examples(Method) ->
}
].
bridge_v2_examples(Method) ->
WriteExample =
<<"${topic},clientid=${clientid} ", "payload=${payload},",
"${clientid}_int_value=${payload.int_key}i,", "bool=${payload.bool}">>,
ParamsExample = #{
parameters => #{
write_syntax => WriteExample, precision => ms
}
},
[
#{
<<"influxdb">> => #{
summary => <<"InfluxDB Action">>,
value => emqx_bridge_v2_schema:action_values(
Method, influxdb, influxdb, ParamsExample
)
}
}
].
connector_examples(Method) ->
[
#{
<<"influxdb_api_v1">> => #{
summary => <<"InfluxDB HTTP API V1 Connector">>,
value => emqx_connector_schema:connector_values(
Method, influxdb, connector_values(influxdb_api_v1)
)
}
},
#{
<<"influxdb_api_v2">> => #{
summary => <<"InfluxDB HTTP API V2 Connector">>,
value => emqx_connector_schema:connector_values(
Method, influxdb, connector_values(influxdb_api_v2)
)
}
}
].
connector_values(Type) ->
maps:merge(basic_connector_values(), #{parameters => connector_values_v(Type)}).
connector_values_v(influxdb_api_v2) ->
#{
influxdb_type => influxdb_api_v2,
bucket => <<"example_bucket">>,
org => <<"examlpe_org">>,
token => <<"example_token">>
};
connector_values_v(influxdb_api_v1) ->
#{
influxdb_type => influxdb_api_v1,
database => <<"example_database">>,
username => <<"example_username">>,
password => <<"******">>
}.
basic_connector_values() ->
#{
enable => true,
server => <<"127.0.0.1:8086">>,
ssl => #{enable => false}
}.
values(Protocol, get) ->
values(Protocol, post);
values("influxdb_api_v2", post) ->
SupportUint = <<"uint_value=${payload.uint_key}u,">>,
TypeOpts = #{
bucket => <<"example_bucket">>,
org => <<"examlpe_org">>,
token => <<"example_token">>,
server => <<"127.0.0.1:8086">>
},
TypeOpts = connector_values_v(influxdb_api_v2),
values(common, "influxdb_api_v2", SupportUint, TypeOpts);
values("influxdb_api_v1", post) ->
SupportUint = <<>>,
TypeOpts = #{
database => <<"example_database">>,
username => <<"example_username">>,
password => <<"******">>,
server => <<"127.0.0.1:8086">>
},
TypeOpts = connector_values_v(influxdb_api_v1),
values(common, "influxdb_api_v1", SupportUint, TypeOpts);
values(Protocol, put) ->
values(Protocol, post).
@ -98,6 +163,10 @@ namespace() -> "bridge_influxdb".
roots() -> [].
fields("config_connector") ->
emqx_connector_schema:common_fields() ++
emqx_bridge_influxdb_connector:fields("connector") ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts);
fields("post_api_v1") ->
method_fields(post, influxdb_api_v1);
fields("post_api_v2") ->
@ -110,6 +179,40 @@ fields("get_api_v1") ->
method_fields(get, influxdb_api_v1);
fields("get_api_v2") ->
method_fields(get, influxdb_api_v2);
fields(action) ->
{influxdb,
mk(
hoconsc:map(name, ref(?MODULE, influxdb_action)),
#{desc => <<"InfluxDB Action Config">>, required => false}
)};
fields(influxdb_action) ->
emqx_bridge_v2_schema:make_producer_action_schema(
mk(ref(?MODULE, action_parameters), #{
required => true, desc => ?DESC(action_parameters)
})
);
fields(action_parameters) ->
[
{write_syntax, fun write_syntax/1},
emqx_bridge_influxdb_connector:precision_field()
];
fields(connector_resource_opts) ->
emqx_connector_schema:resource_opts_fields();
fields(Field) when
Field == "get_connector";
Field == "put_connector";
Field == "post_connector"
->
Fields =
emqx_bridge_influxdb_connector:fields("connector") ++
emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts),
emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields);
fields(Field) when
Field == "get_bridge_v2";
Field == "post_bridge_v2";
Field == "put_bridge_v2"
->
emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(influxdb_action));
fields(Type) when
Type == influxdb_api_v1 orelse Type == influxdb_api_v2
->
@ -154,6 +257,14 @@ desc(influxdb_api_v1) ->
?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v1");
desc(influxdb_api_v2) ->
?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v2");
desc(influxdb_action) ->
?DESC(influxdb_action);
desc(action_parameters) ->
?DESC(action_parameters);
desc("config_connector") ->
?DESC("desc_config");
desc(connector_resource_opts) ->
?DESC(emqx_resource_schema, "resource_opts");
desc(_) ->
undefined.

View File

@ -0,0 +1,76 @@
-module(emqx_bridge_influxdb_action_info).
-behaviour(emqx_action_info).
-export([
bridge_v1_config_to_action_config/2,
bridge_v1_config_to_connector_config/1,
connector_action_config_to_bridge_v1_config/2,
bridge_v1_type_name/0,
action_type_name/0,
connector_type_name/0,
schema_module/0
]).
%% dynamic callback
-export([
bridge_v1_type_name_fun/1
]).
-import(emqx_utils_conv, [bin/1]).
-define(SCHEMA_MODULE, emqx_bridge_influxdb).
bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) ->
ActionTopLevelKeys = schema_keys(influxdb_action),
ActionParametersKeys = schema_keys(action_parameters),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1,
ActionConfig#{<<"connector">> => ConnectorName}
).
bridge_v1_config_to_connector_config(BridgeV1Config) ->
ActionTopLevelKeys = schema_keys(influxdb_action),
ActionParametersKeys = schema_keys(action_parameters),
ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys,
ConnectorTopLevelKeys = schema_keys("config_connector"),
ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys),
ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys,
ConnConfig0 = make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config),
emqx_utils_maps:update_if_present(
<<"resource_opts">>,
fun emqx_connector_schema:project_to_connector_resource_opts/1,
ConnConfig0
).
connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) ->
RawConf = emqx_action_info:connector_action_config_to_bridge_v1_config(
ConnectorRawConf, ActionRawConf
),
maps:without([<<"influxdb_type">>], RawConf).
bridge_v1_type_name() ->
{fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}.
action_type_name() -> influxdb.
connector_type_name() -> influxdb.
schema_module() -> ?SCHEMA_MODULE.
bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"database">> := _}}, _}) ->
influxdb_api_v1;
bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"bucket">> := _}}, _}) ->
influxdb_api_v2.
make_config_map(PickKeys, IndentKeys, Config) ->
Conf0 = maps:with(PickKeys, Config),
emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0).
schema_keys(Name) ->
[bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))].
bridge_v1_type_names() -> [influxdb, influxdb_api_v1, influxdb_api_v2].

View File

@ -19,6 +19,10 @@
callback_mode/0,
on_start/2,
on_stop/2,
on_add_channel/4,
on_remove_channel/3,
on_get_channel_status/3,
on_get_channels/1,
on_query/3,
on_batch_query/3,
on_query_async/4,
@ -34,6 +38,10 @@
desc/1
]).
-export([transform_bridge_v1_config_to_connector_config/1]).
-export([precision_field/0]).
%% only for test
-export([is_unrecoverable_error/1]).
@ -55,6 +63,38 @@
%% resource callback
callback_mode() -> async_if_possible.
on_add_channel(
_InstanceId,
#{channels := Channels, client := Client} = OldState,
ChannelId,
#{parameters := Parameters} = ChannelConfig0
) ->
#{write_syntax := WriteSytaxTmpl} = Parameters,
Precision = maps:get(precision, Parameters, ms),
ChannelConfig = maps:merge(
Parameters,
ChannelConfig0#{
channel_client => influxdb:update_precision(Client, Precision),
write_syntax => to_config(WriteSytaxTmpl, Precision)
}
),
{ok, OldState#{
channels => maps:put(ChannelId, ChannelConfig, Channels)
}}.
on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) ->
NewState = State#{channels => maps:remove(ChannelId, Channels)},
{ok, NewState}.
on_get_channel_status(InstanceId, _ChannelId, State) ->
case on_get_status(InstanceId, State) of
connected -> connected;
_ -> connecting
end.
on_get_channels(InstanceId) ->
emqx_bridge_v2:get_channels_for_connector(InstanceId).
on_start(InstId, Config) ->
%% InstID as pool would be handled by influxdb client
%% so there is no need to allocate pool_name here
@ -73,8 +113,10 @@ on_stop(InstId, _State) ->
ok
end.
on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) ->
case data_to_points(Data, SyntaxLines) of
on_query(InstId, {Channel, Message}, #{channels := ChannelConf}) ->
#{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
#{channel_client := Client} = maps:get(Channel, ChannelConf),
case data_to_points(Message, SyntaxLines) of
{ok, Points} ->
?tp(
influxdb_connector_send_query,
@ -92,7 +134,10 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c
%% Once a Batched Data trans to points failed.
%% This batch query failed
on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) ->
on_batch_query(InstId, BatchData, #{channels := ChannelConf}) ->
[{Channel, _} | _] = BatchData,
#{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
#{channel_client := Client} = maps:get(Channel, ChannelConf),
case parse_batch_data(InstId, BatchData, SyntaxLines) of
{ok, Points} ->
?tp(
@ -110,11 +155,13 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client
on_query_async(
InstId,
{send_message, Data},
{Channel, Message},
{ReplyFun, Args},
_State = #{write_syntax := SyntaxLines, client := Client}
#{channels := ChannelConf}
) ->
case data_to_points(Data, SyntaxLines) of
#{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
#{channel_client := Client} = maps:get(Channel, ChannelConf),
case data_to_points(Message, SyntaxLines) of
{ok, Points} ->
?tp(
influxdb_connector_send_query,
@ -134,8 +181,11 @@ on_batch_query_async(
InstId,
BatchData,
{ReplyFun, Args},
#{write_syntax := SyntaxLines, client := Client}
#{channels := ChannelConf}
) ->
[{Channel, _} | _] = BatchData,
#{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf),
#{channel_client := Client} = maps:get(Channel, ChannelConf),
case parse_batch_data(InstId, BatchData, SyntaxLines) of
{ok, Points} ->
?tp(
@ -159,6 +209,22 @@ on_get_status(_InstId, #{client := Client}) ->
disconnected
end.
transform_bridge_v1_config_to_connector_config(BridgeV1Config) ->
IndentKeys = [username, password, database, token, bucket, org],
ConnConfig0 = maps:without([write_syntax, precision], BridgeV1Config),
ConnConfig1 =
case emqx_utils_maps:indent(parameters, IndentKeys, ConnConfig0) of
#{parameters := #{database := _} = Params} = Conf ->
Conf#{parameters => Params#{influxdb_type => influxdb_api_v1}};
#{parameters := #{bucket := _} = Params} = Conf ->
Conf#{parameters => Params#{influxdb_type => influxdb_api_v2}}
end,
emqx_utils_maps:update_if_present(
resource_opts,
fun emqx_connector_schema:project_to_connector_resource_opts/1,
ConnConfig1
).
%% -------------------------------------------------------------------------------------------------
%% schema
namespace() -> connector_influxdb.
@ -166,41 +232,75 @@ namespace() -> connector_influxdb.
roots() ->
[
{config, #{
type => hoconsc:union(
[
hoconsc:ref(?MODULE, influxdb_api_v1),
hoconsc:ref(?MODULE, influxdb_api_v2)
]
)
type => hoconsc:ref(?MODULE, "connector")
}}
].
fields("connector") ->
[
server_field(),
parameter_field()
] ++ emqx_connector_schema_lib:ssl_fields();
fields("connector_influxdb_api_v1") ->
[influxdb_type_field(influxdb_api_v1) | influxdb_api_v1_fields()];
fields("connector_influxdb_api_v2") ->
[influxdb_type_field(influxdb_api_v2) | influxdb_api_v2_fields()];
%% ============ begin: schema for old bridge configs ============
fields(influxdb_api_v1) ->
fields(common) ++ influxdb_api_v1_fields();
fields(influxdb_api_v2) ->
fields(common) ++ influxdb_api_v2_fields();
fields(common) ->
[
{server, server()},
{precision,
%% The influxdb only supports these 4 precision:
%% See "https://github.com/influxdata/influxdb/blob/
%% 6b607288439a991261307518913eb6d4e280e0a7/models/points.go#L487" for
%% more information.
mk(enum([ns, us, ms, s]), #{
required => false, default => ms, desc => ?DESC("precision")
})}
];
fields(influxdb_api_v1) ->
fields(common) ++
[
{database, mk(binary(), #{required => true, desc => ?DESC("database")})},
{username, mk(binary(), #{desc => ?DESC("username")})},
{password, emqx_schema_secret:mk(#{desc => ?DESC("password")})}
] ++ emqx_connector_schema_lib:ssl_fields();
fields(influxdb_api_v2) ->
fields(common) ++
[
{bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})},
{org, mk(binary(), #{required => true, desc => ?DESC("org")})},
{token, emqx_schema_secret:mk(#{required => true, desc => ?DESC("token")})}
] ++ emqx_connector_schema_lib:ssl_fields().
server_field(),
precision_field()
] ++ emqx_connector_schema_lib:ssl_fields().
%% ============ end: schema for old bridge configs ============
influxdb_type_field(Type) ->
{influxdb_type, #{
required => true,
type => Type,
default => Type,
desc => ?DESC(atom_to_list(Type))
}}.
server_field() ->
{server, server()}.
precision_field() ->
{precision,
%% The influxdb only supports these 4 precision:
%% See "https://github.com/influxdata/influxdb/blob/
%% 6b607288439a991261307518913eb6d4e280e0a7/models/points.go#L487" for
%% more information.
mk(enum([ns, us, ms, s]), #{
required => false, default => ms, desc => ?DESC("precision")
})}.
parameter_field() ->
{parameters,
mk(
hoconsc:union([
ref(?MODULE, "connector_" ++ T)
|| T <- ["influxdb_api_v1", "influxdb_api_v2"]
]),
#{required => true, desc => ?DESC("influxdb_parameters")}
)}.
influxdb_api_v1_fields() ->
[
{database, mk(binary(), #{required => true, desc => ?DESC("database")})},
{username, mk(binary(), #{desc => ?DESC("username")})},
{password, emqx_schema_secret:mk(#{desc => ?DESC("password")})}
].
influxdb_api_v2_fields() ->
[
{bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})},
{org, mk(binary(), #{required => true, desc => ?DESC("org")})},
{token, emqx_schema_secret:mk(#{required => true, desc => ?DESC("token")})}
].
server() ->
Meta = #{
@ -213,9 +313,19 @@ server() ->
desc(common) ->
?DESC("common");
desc(parameters) ->
?DESC("influxdb_parameters");
desc("influxdb_parameters") ->
?DESC("influxdb_parameters");
desc(influxdb_api_v1) ->
?DESC("influxdb_api_v1");
desc(influxdb_api_v2) ->
?DESC("influxdb_api_v2");
desc("connector") ->
?DESC("connector");
desc("connector_influxdb_api_v1") ->
?DESC("influxdb_api_v1");
desc("connector_influxdb_api_v2") ->
?DESC("influxdb_api_v2").
%% -------------------------------------------------------------------------------------------------
@ -248,22 +358,14 @@ start_client(InstId, Config) ->
{error, R}
end.
do_start_client(
InstId,
ClientConfig,
Config = #{write_syntax := Lines}
) ->
Precision = maps:get(precision, Config, ms),
do_start_client(InstId, ClientConfig, Config) ->
case influxdb:start_client(ClientConfig) of
{ok, Client} ->
case influxdb:is_alive(Client, true) of
true ->
case influxdb:check_auth(Client) of
ok ->
State = #{
client => Client,
write_syntax => to_config(Lines, Precision)
},
State = #{client => Client, channels => #{}},
?SLOG(info, #{
msg => "starting_influxdb_connector_success",
connector => InstId,
@ -328,28 +430,21 @@ client_config(
{host, str(Host)},
{port, Port},
{pool_size, erlang:system_info(schedulers)},
{pool, InstId},
{precision, atom_to_binary(maps:get(precision, Config, ms), utf8)}
{pool, InstId}
] ++ protocol_config(Config).
%% api v1 config
protocol_config(
#{
database := DB,
ssl := SSL
} = Config
) ->
protocol_config(#{
parameters := #{influxdb_type := influxdb_api_v1, database := DB} = Params, ssl := SSL
}) ->
[
{protocol, http},
{version, v1},
{database, str(DB)}
] ++ username(Config) ++
password(Config) ++ ssl_config(SSL);
] ++ username(Params) ++ password(Params) ++ ssl_config(SSL);
%% api v2 config
protocol_config(#{
bucket := Bucket,
org := Org,
token := Token,
parameters := #{influxdb_type := influxdb_api_v2, bucket := Bucket, org := Org, token := Token},
ssl := SSL
}) ->
[
@ -501,7 +596,7 @@ to_maps_config(K, V, Res) ->
%% Tags & Fields Data Trans
parse_batch_data(InstId, BatchData, SyntaxLines) ->
{Points, Errors} = lists:foldl(
fun({send_message, Data}, {ListOfPoints, ErrAccIn}) ->
fun({_, Data}, {ListOfPoints, ErrAccIn}) ->
case data_to_points(Data, SyntaxLines) of
{ok, Points} ->
{[Points | ListOfPoints], ErrAccIn};

View File

@ -537,11 +537,11 @@ t_start_ok(Config) ->
begin
case QueryMode of
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500);
?assertMatch(ok, send_message(Config, SentData));
sync ->
?assertMatch({ok, 204, _}, send_message(Config, SentData))
end,
ct:sleep(1500),
PersistedData = query_by_clientid(ClientId, Config),
Expected = #{
bool => <<"true">>,
@ -594,8 +594,11 @@ t_start_already_started(Config) ->
{ok, #{bridges := #{TypeAtom := #{NameAtom := InfluxDBConfigMap}}}} = emqx_hocon:check(
emqx_bridge_schema, InfluxDBConfigString
),
ConnConfigMap = emqx_bridge_influxdb_connector:transform_bridge_v1_config_to_connector_config(
InfluxDBConfigMap
),
?check_trace(
emqx_bridge_influxdb_connector:on_start(ResourceId, InfluxDBConfigMap),
emqx_bridge_influxdb_connector:on_start(ResourceId, ConnConfigMap),
fun(Result, Trace) ->
?assertMatch({ok, _}, Result),
?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)),
@ -700,11 +703,11 @@ t_const_timestamp(Config) ->
},
case QueryMode of
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500);
?assertMatch(ok, send_message(Config, SentData));
sync ->
?assertMatch({ok, 204, _}, send_message(Config, SentData))
end,
ct:sleep(1500),
PersistedData = query_by_clientid(ClientId, Config),
Expected = #{foo => <<"123">>},
assert_persisted_data(ClientId, Expected, PersistedData),
@ -762,10 +765,7 @@ t_boolean_variants(Config) ->
async ->
?assertMatch(ok, send_message(Config, SentData))
end,
case QueryMode of
async -> ct:sleep(500);
sync -> ok
end,
ct:sleep(1500),
PersistedData = query_by_clientid(ClientId, Config),
Expected = #{
bool => atom_to_binary(Translation),
@ -817,9 +817,10 @@ t_any_num_as_float(Config) ->
?assertMatch({ok, 204, _}, send_message(Config, SentData)),
ok;
async ->
?assertMatch(ok, send_message(Config, SentData)),
ct:sleep(500)
?assertMatch(ok, send_message(Config, SentData))
end,
%% sleep is still need even in sync mode, or we would get an empty result sometimes
ct:sleep(1500),
PersistedData = query_by_clientid(ClientId, Config),
Expected = #{float_no_dp => <<"123">>, float_dp => <<"123">>},
assert_persisted_data(ClientId, Expected, PersistedData),
@ -938,10 +939,14 @@ t_create_disconnected(Config) ->
?assertMatch({ok, _}, create_bridge(Config))
end),
fun(Trace) ->
?assertMatch(
[#{error := influxdb_client_not_alive, reason := econnrefused}],
?of_kind(influxdb_connector_start_failed, Trace)
),
[#{error := influxdb_client_not_alive, reason := Reason}] =
?of_kind(influxdb_connector_start_failed, Trace),
case Reason of
econnrefused -> ok;
closed -> ok;
{closed, _} -> ok;
_ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason])
end,
ok
end
),
@ -1146,10 +1151,8 @@ t_authentication_error(Config0) ->
ok.
t_authentication_error_on_get_status(Config0) ->
ResourceId = resource_id(Config0),
% Fake initialization to simulate credential update after bridge was created.
emqx_common_test_helpers:with_mock(
ResourceId = emqx_common_test_helpers:with_mock(
influxdb,
check_auth,
fun(_) ->
@ -1165,20 +1168,20 @@ t_authentication_error_on_get_status(Config0) ->
end,
Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}),
{ok, _} = create_bridge(Config),
ResourceId = resource_id(Config0),
?retry(
_Sleep = 1_000,
_Attempts = 10,
?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId))
)
),
ResourceId
end
),
% Now back to wrong credentials
?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)),
ok.
t_authentication_error_on_send_message(Config0) ->
ResourceId = resource_id(Config0),
QueryMode = proplists:get_value(query_mode, Config0, sync),
InfluxDBType = ?config(influxdb_type, Config0),
InfluxConfig0 = proplists:get_value(influxdb_config, Config0),
@ -1198,6 +1201,7 @@ t_authentication_error_on_send_message(Config0) ->
end,
fun() ->
{ok, _} = create_bridge(Config),
ResourceId = resource_id(Config),
?retry(
_Sleep = 1_000,
_Attempts = 10,

View File

@ -66,7 +66,7 @@ t_lifecycle(Config) ->
Port = ?config(influxdb_tcp_port, Config),
perform_lifecycle_check(
<<"emqx_bridge_influxdb_connector_SUITE">>,
influxdb_config(Host, Port, false, <<"verify_none">>)
influxdb_connector_config(Host, Port, false, <<"verify_none">>)
).
perform_lifecycle_check(PoolName, InitialConfig) ->
@ -76,6 +76,7 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
% expects this
FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()},
{ok, #{
id := ResourceId,
state := #{client := #{pool := ReturnedPoolName}} = State,
status := InitialStatus
}} = emqx_resource:create_local(
@ -93,8 +94,18 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
%% install actions to the connector
ActionConfig = influxdb_action_config(),
ChannelId = <<"test_channel">>,
?assertEqual(
ok,
emqx_resource_manager:add_channel(
ResourceId, ChannelId, ActionConfig
)
),
?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)),
% % Perform query as further check that the resource is working as expected
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())),
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query(ChannelId))),
?assertEqual(ok, emqx_resource:stop(PoolName)),
% Resource will be listed still, but state will be changed and healthcheck will fail
% as the worker no longer exists.
@ -116,7 +127,15 @@ perform_lifecycle_check(PoolName, InitialConfig) ->
{ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} =
emqx_resource:get_instance(PoolName),
?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)),
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())),
ChannelId = <<"test_channel">>,
?assertEqual(
ok,
emqx_resource_manager:add_channel(
ResourceId, ChannelId, ActionConfig
)
),
?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)),
?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query(ChannelId))),
% Stop and remove the resource in one go.
?assertEqual(ok, emqx_resource:remove_local(PoolName)),
?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)),
@ -127,7 +146,7 @@ t_tls_verify_none(Config) ->
PoolName = <<"testpool-1">>,
Host = ?config(influxdb_tls_host, Config),
Port = ?config(influxdb_tls_port, Config),
InitialConfig = influxdb_config(Host, Port, true, <<"verify_none">>),
InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_none">>),
ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid),
?assertEqual(connected, ValidStatus),
InvalidStatus = perform_tls_opts_check(PoolName, InitialConfig, fail),
@ -138,7 +157,7 @@ t_tls_verify_peer(Config) ->
PoolName = <<"testpool-2">>,
Host = ?config(influxdb_tls_host, Config),
Port = ?config(influxdb_tls_port, Config),
InitialConfig = influxdb_config(Host, Port, true, <<"verify_peer">>),
InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_peer">>),
%% This works without a CA-cert & friends since we are using a mock
ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid),
?assertEqual(connected, ValidStatus),
@ -191,19 +210,30 @@ perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) ->
% %% Helpers
% %%------------------------------------------------------------------------------
influxdb_config(Host, Port, SslEnabled, Verify) ->
influxdb_connector_config(Host, Port, SslEnabled, Verify) ->
Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])),
ResourceConfig = #{
<<"bucket">> => <<"mqtt">>,
<<"org">> => <<"emqx">>,
<<"token">> => <<"abcdefg">>,
ConnectorConf = #{
<<"parameters">> => #{
<<"influxdb_type">> => <<"influxdb_api_v2">>,
<<"bucket">> => <<"mqtt">>,
<<"org">> => <<"emqx">>,
<<"token">> => <<"abcdefg">>
},
<<"server">> => Server,
<<"ssl">> => #{
<<"enable">> => SslEnabled,
<<"verify">> => Verify
}
},
#{<<"config">> => ResourceConfig}.
#{<<"config">> => ConnectorConf}.
influxdb_action_config() ->
#{
parameters => #{
write_syntax => influxdb_write_syntax(),
precision => ms
}
}.
custom_verify() ->
fun
@ -227,8 +257,8 @@ influxdb_write_syntax() ->
}
].
test_query() ->
{send_message, #{
test_query(ChannelId) ->
{ChannelId, #{
<<"clientid">> => <<"something">>,
<<"payload">> => #{bool => true},
<<"topic">> => <<"connector_test">>,

View File

@ -1,7 +1,7 @@
%% -*- mode: erlang -*-
{application, emqx_connector, [
{description, "EMQX Data Integration Connectors"},
{vsn, "0.1.36"},
{vsn, "0.1.37"},
{registered, []},
{mod, {emqx_connector_app, []}},
{applications, [

View File

@ -34,6 +34,8 @@ resource_type(matrix) ->
emqx_postgresql;
resource_type(mongodb) ->
emqx_bridge_mongodb_connector;
resource_type(influxdb) ->
emqx_bridge_influxdb_connector;
resource_type(mysql) ->
emqx_bridge_mysql_connector;
resource_type(pgsql) ->
@ -112,6 +114,14 @@ connector_structs() ->
required => false
}
)},
{influxdb,
mk(
hoconsc:map(name, ref(emqx_bridge_influxdb, "config_connector")),
#{
desc => <<"InfluxDB Connector Config">>,
required => false
}
)},
{mysql,
mk(
hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")),
@ -170,6 +180,7 @@ schema_modules() ->
emqx_bridge_kafka,
emqx_bridge_matrix,
emqx_bridge_mongodb,
emqx_bridge_influxdb,
emqx_bridge_mysql,
emqx_bridge_syskeeper_connector,
emqx_bridge_syskeeper_proxy,
@ -196,6 +207,7 @@ api_schemas(Method) ->
api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"),
api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"),
api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"),
api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"),
api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"),
api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method),
api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method),

View File

@ -131,6 +131,8 @@ connector_type_to_bridge_types(matrix) ->
[matrix];
connector_type_to_bridge_types(mongodb) ->
[mongodb, mongodb_rs, mongodb_sharded, mongodb_single];
connector_type_to_bridge_types(influxdb) ->
[influxdb, influxdb_api_v1, influxdb_api_v2];
connector_type_to_bridge_types(mysql) ->
[mysql];
connector_type_to_bridge_types(pgsql) ->

View File

@ -199,7 +199,7 @@ defmodule EMQXUmbrella.MixProject do
defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do
[
{:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true},
{:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.12", override: true},
{:wolff, github: "kafka4beam/wolff", tag: "1.9.1"},
{:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true},
{:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"},

View File

@ -47,4 +47,19 @@ Please note that a placeholder for an integer value must be annotated with a suf
write_syntax.label:
"""Write Syntax"""
action_parameters.label:
"""Action Parameters"""
action_parameters.desc:
"""Additional parameters specific to this action type"""
connector.label:
"""InfluxDB Connector"""
connector.desc:
"""InfluxDB Connector Configs"""
influxdb_action.label:
"""InfluxDB Action"""
influxdb_action.desc:
"""Action to interact with a InfluxDB connector"""
}

View File

@ -68,4 +68,9 @@ username.desc:
username.label:
"""Username"""
influxdb_parameters.label:
"""InfluxDB Type Specific Parameters"""
influxdb_parameters.desc:
"""Set of parameters specific for the given type of this InfluxDB connector, `influxdb_type` can be one of `influxdb_api_v1`, `influxdb_api_v1`."""
}