diff --git a/apps/emqx_bridge/src/emqx_action_info.erl b/apps/emqx_bridge/src/emqx_action_info.erl index e9c51edfa..411ab54f1 100644 --- a/apps/emqx_bridge/src/emqx_action_info.erl +++ b/apps/emqx_bridge/src/emqx_action_info.erl @@ -80,6 +80,7 @@ hard_coded_action_info_modules_ee() -> emqx_bridge_kafka_action_info, emqx_bridge_matrix_action_info, emqx_bridge_mongodb_action_info, + emqx_bridge_influxdb_action_info, emqx_bridge_mysql_action_info, emqx_bridge_pgsql_action_info, emqx_bridge_syskeeper_action_info, diff --git a/apps/emqx_bridge/src/emqx_bridge.app.src b/apps/emqx_bridge/src/emqx_bridge.app.src index d9bded51b..cadbf35a0 100644 --- a/apps/emqx_bridge/src/emqx_bridge.app.src +++ b/apps/emqx_bridge/src/emqx_bridge.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_bridge, [ {description, "EMQX bridges"}, - {vsn, "0.1.32"}, + {vsn, "0.1.33"}, {registered, [emqx_bridge_sup]}, {mod, {emqx_bridge_app, []}}, {applications, [ diff --git a/apps/emqx_bridge_influxdb/rebar.config b/apps/emqx_bridge_influxdb/rebar.config index 8acb242eb..c6ad26ac1 100644 --- a/apps/emqx_bridge_influxdb/rebar.config +++ b/apps/emqx_bridge_influxdb/rebar.config @@ -3,7 +3,7 @@ {erl_opts, [debug_info]}. {deps, [ - {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.11"}}}, + {influxdb, {git, "https://github.com/emqx/influxdb-client-erl", {tag, "1.1.12"}}}, {emqx_connector, {path, "../../apps/emqx_connector"}}, {emqx_resource, {path, "../../apps/emqx_resource"}}, {emqx_bridge, {path, "../../apps/emqx_bridge"}} diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src index ef288368d..de58c2170 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src @@ -1,6 +1,6 @@ {application, emqx_bridge_influxdb, [ {description, "EMQX Enterprise InfluxDB Bridge"}, - {vsn, "0.1.7"}, + {vsn, "0.2.0"}, {registered, []}, {applications, [ kernel, @@ -8,7 +8,7 @@ emqx_resource, influxdb ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_influxdb_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl index acb295752..4228d23d5 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl @@ -11,7 +11,6 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - conn_bridge_examples/1, write_syntax_type/0 ]). @@ -22,17 +21,28 @@ desc/1 ]). +%% Examples +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 +]). + -type write_syntax() :: list(). -reflect_type([write_syntax/0]). -typerefl_from_string({write_syntax/0, ?MODULE, to_influx_lines}). -export([to_influx_lines/1]). +-define(CONNECTOR_TYPE, influxdb). +-define(ACTION_TYPE, influxdb). + %% ------------------------------------------------------------------------------------------------- %% api write_syntax_type() -> typerefl:alias("string", write_syntax()). +%% Examples conn_bridge_examples(Method) -> [ #{ @@ -49,25 +59,80 @@ conn_bridge_examples(Method) -> } ]. +bridge_v2_examples(Method) -> + WriteExample = + <<"${topic},clientid=${clientid} ", "payload=${payload},", + "${clientid}_int_value=${payload.int_key}i,", "bool=${payload.bool}">>, + ParamsExample = #{ + parameters => #{ + write_syntax => WriteExample, precision => ms + } + }, + [ + #{ + <<"influxdb">> => #{ + summary => <<"InfluxDB Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, influxdb, influxdb, ParamsExample + ) + } + } + ]. + +connector_examples(Method) -> + [ + #{ + <<"influxdb_api_v1">> => #{ + summary => <<"InfluxDB HTTP API V1 Connector">>, + value => emqx_connector_schema:connector_values( + Method, influxdb, connector_values(influxdb_api_v1) + ) + } + }, + #{ + <<"influxdb_api_v2">> => #{ + summary => <<"InfluxDB HTTP API V2 Connector">>, + value => emqx_connector_schema:connector_values( + Method, influxdb, connector_values(influxdb_api_v2) + ) + } + } + ]. + +connector_values(Type) -> + maps:merge(basic_connector_values(), #{parameters => connector_values_v(Type)}). + +connector_values_v(influxdb_api_v2) -> + #{ + influxdb_type => influxdb_api_v2, + bucket => <<"example_bucket">>, + org => <<"examlpe_org">>, + token => <<"example_token">> + }; +connector_values_v(influxdb_api_v1) -> + #{ + influxdb_type => influxdb_api_v1, + database => <<"example_database">>, + username => <<"example_username">>, + password => <<"******">> + }. + +basic_connector_values() -> + #{ + enable => true, + server => <<"127.0.0.1:8086">>, + ssl => #{enable => false} + }. + values(Protocol, get) -> values(Protocol, post); values("influxdb_api_v2", post) -> SupportUint = <<"uint_value=${payload.uint_key}u,">>, - TypeOpts = #{ - bucket => <<"example_bucket">>, - org => <<"examlpe_org">>, - token => <<"example_token">>, - server => <<"127.0.0.1:8086">> - }, + TypeOpts = connector_values_v(influxdb_api_v2), values(common, "influxdb_api_v2", SupportUint, TypeOpts); values("influxdb_api_v1", post) -> SupportUint = <<>>, - TypeOpts = #{ - database => <<"example_database">>, - username => <<"example_username">>, - password => <<"******">>, - server => <<"127.0.0.1:8086">> - }, + TypeOpts = connector_values_v(influxdb_api_v1), values(common, "influxdb_api_v1", SupportUint, TypeOpts); values(Protocol, put) -> values(Protocol, post). @@ -98,6 +163,10 @@ namespace() -> "bridge_influxdb". roots() -> []. +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + emqx_bridge_influxdb_connector:fields("connector") ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); fields("post_api_v1") -> method_fields(post, influxdb_api_v1); fields("post_api_v2") -> @@ -110,6 +179,40 @@ fields("get_api_v1") -> method_fields(get, influxdb_api_v1); fields("get_api_v2") -> method_fields(get, influxdb_api_v2); +fields(action) -> + {influxdb, + mk( + hoconsc:map(name, ref(?MODULE, influxdb_action)), + #{desc => <<"InfluxDB Action Config">>, required => false} + )}; +fields(influxdb_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk(ref(?MODULE, action_parameters), #{ + required => true, desc => ?DESC(action_parameters) + }) + ); +fields(action_parameters) -> + [ + {write_syntax, fun write_syntax/1}, + emqx_bridge_influxdb_connector:precision_field() + ]; +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + Fields = + emqx_bridge_influxdb_connector:fields("connector") ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(influxdb_action)); fields(Type) when Type == influxdb_api_v1 orelse Type == influxdb_api_v2 -> @@ -154,6 +257,14 @@ desc(influxdb_api_v1) -> ?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v1"); desc(influxdb_api_v2) -> ?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v2"); +desc(influxdb_action) -> + ?DESC(influxdb_action); +desc(action_parameters) -> + ?DESC(action_parameters); +desc("config_connector") -> + ?DESC("desc_config"); +desc(connector_resource_opts) -> + ?DESC(emqx_resource_schema, "resource_opts"); desc(_) -> undefined. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl new file mode 100644 index 000000000..00a6c5510 --- /dev/null +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl @@ -0,0 +1,76 @@ +-module(emqx_bridge_influxdb_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1, + connector_action_config_to_bridge_v1_config/2, + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +%% dynamic callback +-export([ + bridge_v1_type_name_fun/1 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(SCHEMA_MODULE, emqx_bridge_influxdb). + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(influxdb_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig#{<<"connector">> => ConnectorName} + ). + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ActionTopLevelKeys = schema_keys(influxdb_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ConnectorTopLevelKeys = schema_keys("config_connector"), + ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys), + ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys, + ConnConfig0 = make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + ConnConfig0 + ). + +connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) -> + RawConf = emqx_action_info:connector_action_config_to_bridge_v1_config( + ConnectorRawConf, ActionRawConf + ), + maps:without([<<"influxdb_type">>], RawConf). + +bridge_v1_type_name() -> + {fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}. + +action_type_name() -> influxdb. + +connector_type_name() -> influxdb. + +schema_module() -> ?SCHEMA_MODULE. + +bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"database">> := _}}, _}) -> + influxdb_api_v1; +bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"bucket">> := _}}, _}) -> + influxdb_api_v2. + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config), + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). + +schema_keys(Name) -> + [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))]. + +bridge_v1_type_names() -> [influxdb, influxdb_api_v1, influxdb_api_v2]. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 2b4fb8d74..7a84bc440 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -19,6 +19,10 @@ callback_mode/0, on_start/2, on_stop/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channel_status/3, + on_get_channels/1, on_query/3, on_batch_query/3, on_query_async/4, @@ -34,6 +38,10 @@ desc/1 ]). +-export([transform_bridge_v1_config_to_connector_config/1]). + +-export([precision_field/0]). + %% only for test -export([is_unrecoverable_error/1]). @@ -55,6 +63,38 @@ %% resource callback callback_mode() -> async_if_possible. +on_add_channel( + _InstanceId, + #{channels := Channels, client := Client} = OldState, + ChannelId, + #{parameters := Parameters} = ChannelConfig0 +) -> + #{write_syntax := WriteSytaxTmpl} = Parameters, + Precision = maps:get(precision, Parameters, ms), + ChannelConfig = maps:merge( + Parameters, + ChannelConfig0#{ + channel_client => influxdb:update_precision(Client, Precision), + write_syntax => to_config(WriteSytaxTmpl, Precision) + } + ), + {ok, OldState#{ + channels => maps:put(ChannelId, ChannelConfig, Channels) + }}. + +on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) -> + NewState = State#{channels => maps:remove(ChannelId, Channels)}, + {ok, NewState}. + +on_get_channel_status(InstanceId, _ChannelId, State) -> + case on_get_status(InstanceId, State) of + connected -> connected; + _ -> connecting + end. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + on_start(InstId, Config) -> %% InstID as pool would be handled by influxdb client %% so there is no need to allocate pool_name here @@ -73,8 +113,10 @@ on_stop(InstId, _State) -> ok end. -on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) -> - case data_to_points(Data, SyntaxLines) of +on_query(InstId, {Channel, Message}, #{channels := ChannelConf}) -> + #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + #{channel_client := Client} = maps:get(Channel, ChannelConf), + case data_to_points(Message, SyntaxLines) of {ok, Points} -> ?tp( influxdb_connector_send_query, @@ -92,7 +134,10 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c %% Once a Batched Data trans to points failed. %% This batch query failed -on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) -> +on_batch_query(InstId, BatchData, #{channels := ChannelConf}) -> + [{Channel, _} | _] = BatchData, + #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + #{channel_client := Client} = maps:get(Channel, ChannelConf), case parse_batch_data(InstId, BatchData, SyntaxLines) of {ok, Points} -> ?tp( @@ -110,11 +155,13 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client on_query_async( InstId, - {send_message, Data}, + {Channel, Message}, {ReplyFun, Args}, - _State = #{write_syntax := SyntaxLines, client := Client} + #{channels := ChannelConf} ) -> - case data_to_points(Data, SyntaxLines) of + #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + #{channel_client := Client} = maps:get(Channel, ChannelConf), + case data_to_points(Message, SyntaxLines) of {ok, Points} -> ?tp( influxdb_connector_send_query, @@ -134,8 +181,11 @@ on_batch_query_async( InstId, BatchData, {ReplyFun, Args}, - #{write_syntax := SyntaxLines, client := Client} + #{channels := ChannelConf} ) -> + [{Channel, _} | _] = BatchData, + #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + #{channel_client := Client} = maps:get(Channel, ChannelConf), case parse_batch_data(InstId, BatchData, SyntaxLines) of {ok, Points} -> ?tp( @@ -159,6 +209,22 @@ on_get_status(_InstId, #{client := Client}) -> disconnected end. +transform_bridge_v1_config_to_connector_config(BridgeV1Config) -> + IndentKeys = [username, password, database, token, bucket, org], + ConnConfig0 = maps:without([write_syntax, precision], BridgeV1Config), + ConnConfig1 = + case emqx_utils_maps:indent(parameters, IndentKeys, ConnConfig0) of + #{parameters := #{database := _} = Params} = Conf -> + Conf#{parameters => Params#{influxdb_type => influxdb_api_v1}}; + #{parameters := #{bucket := _} = Params} = Conf -> + Conf#{parameters => Params#{influxdb_type => influxdb_api_v2}} + end, + emqx_utils_maps:update_if_present( + resource_opts, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + ConnConfig1 + ). + %% ------------------------------------------------------------------------------------------------- %% schema namespace() -> connector_influxdb. @@ -166,41 +232,75 @@ namespace() -> connector_influxdb. roots() -> [ {config, #{ - type => hoconsc:union( - [ - hoconsc:ref(?MODULE, influxdb_api_v1), - hoconsc:ref(?MODULE, influxdb_api_v2) - ] - ) + type => hoconsc:ref(?MODULE, "connector") }} ]. +fields("connector") -> + [ + server_field(), + parameter_field() + ] ++ emqx_connector_schema_lib:ssl_fields(); +fields("connector_influxdb_api_v1") -> + [influxdb_type_field(influxdb_api_v1) | influxdb_api_v1_fields()]; +fields("connector_influxdb_api_v2") -> + [influxdb_type_field(influxdb_api_v2) | influxdb_api_v2_fields()]; +%% ============ begin: schema for old bridge configs ============ +fields(influxdb_api_v1) -> + fields(common) ++ influxdb_api_v1_fields(); +fields(influxdb_api_v2) -> + fields(common) ++ influxdb_api_v2_fields(); fields(common) -> [ - {server, server()}, - {precision, - %% The influxdb only supports these 4 precision: - %% See "https://github.com/influxdata/influxdb/blob/ - %% 6b607288439a991261307518913eb6d4e280e0a7/models/points.go#L487" for - %% more information. - mk(enum([ns, us, ms, s]), #{ - required => false, default => ms, desc => ?DESC("precision") - })} - ]; -fields(influxdb_api_v1) -> - fields(common) ++ - [ - {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, - {username, mk(binary(), #{desc => ?DESC("username")})}, - {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})} - ] ++ emqx_connector_schema_lib:ssl_fields(); -fields(influxdb_api_v2) -> - fields(common) ++ - [ - {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, - {org, mk(binary(), #{required => true, desc => ?DESC("org")})}, - {token, emqx_schema_secret:mk(#{required => true, desc => ?DESC("token")})} - ] ++ emqx_connector_schema_lib:ssl_fields(). + server_field(), + precision_field() + ] ++ emqx_connector_schema_lib:ssl_fields(). +%% ============ end: schema for old bridge configs ============ + +influxdb_type_field(Type) -> + {influxdb_type, #{ + required => true, + type => Type, + default => Type, + desc => ?DESC(atom_to_list(Type)) + }}. + +server_field() -> + {server, server()}. + +precision_field() -> + {precision, + %% The influxdb only supports these 4 precision: + %% See "https://github.com/influxdata/influxdb/blob/ + %% 6b607288439a991261307518913eb6d4e280e0a7/models/points.go#L487" for + %% more information. + mk(enum([ns, us, ms, s]), #{ + required => false, default => ms, desc => ?DESC("precision") + })}. + +parameter_field() -> + {parameters, + mk( + hoconsc:union([ + ref(?MODULE, "connector_" ++ T) + || T <- ["influxdb_api_v1", "influxdb_api_v2"] + ]), + #{required => true, desc => ?DESC("influxdb_parameters")} + )}. + +influxdb_api_v1_fields() -> + [ + {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, + {username, mk(binary(), #{desc => ?DESC("username")})}, + {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})} + ]. + +influxdb_api_v2_fields() -> + [ + {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, + {org, mk(binary(), #{required => true, desc => ?DESC("org")})}, + {token, emqx_schema_secret:mk(#{required => true, desc => ?DESC("token")})} + ]. server() -> Meta = #{ @@ -213,9 +313,19 @@ server() -> desc(common) -> ?DESC("common"); +desc(parameters) -> + ?DESC("influxdb_parameters"); +desc("influxdb_parameters") -> + ?DESC("influxdb_parameters"); desc(influxdb_api_v1) -> ?DESC("influxdb_api_v1"); desc(influxdb_api_v2) -> + ?DESC("influxdb_api_v2"); +desc("connector") -> + ?DESC("connector"); +desc("connector_influxdb_api_v1") -> + ?DESC("influxdb_api_v1"); +desc("connector_influxdb_api_v2") -> ?DESC("influxdb_api_v2"). %% ------------------------------------------------------------------------------------------------- @@ -248,22 +358,14 @@ start_client(InstId, Config) -> {error, R} end. -do_start_client( - InstId, - ClientConfig, - Config = #{write_syntax := Lines} -) -> - Precision = maps:get(precision, Config, ms), +do_start_client(InstId, ClientConfig, Config) -> case influxdb:start_client(ClientConfig) of {ok, Client} -> case influxdb:is_alive(Client, true) of true -> case influxdb:check_auth(Client) of ok -> - State = #{ - client => Client, - write_syntax => to_config(Lines, Precision) - }, + State = #{client => Client, channels => #{}}, ?SLOG(info, #{ msg => "starting_influxdb_connector_success", connector => InstId, @@ -328,28 +430,21 @@ client_config( {host, str(Host)}, {port, Port}, {pool_size, erlang:system_info(schedulers)}, - {pool, InstId}, - {precision, atom_to_binary(maps:get(precision, Config, ms), utf8)} + {pool, InstId} ] ++ protocol_config(Config). %% api v1 config -protocol_config( - #{ - database := DB, - ssl := SSL - } = Config -) -> +protocol_config(#{ + parameters := #{influxdb_type := influxdb_api_v1, database := DB} = Params, ssl := SSL +}) -> [ {protocol, http}, {version, v1}, {database, str(DB)} - ] ++ username(Config) ++ - password(Config) ++ ssl_config(SSL); + ] ++ username(Params) ++ password(Params) ++ ssl_config(SSL); %% api v2 config protocol_config(#{ - bucket := Bucket, - org := Org, - token := Token, + parameters := #{influxdb_type := influxdb_api_v2, bucket := Bucket, org := Org, token := Token}, ssl := SSL }) -> [ @@ -501,7 +596,7 @@ to_maps_config(K, V, Res) -> %% Tags & Fields Data Trans parse_batch_data(InstId, BatchData, SyntaxLines) -> {Points, Errors} = lists:foldl( - fun({send_message, Data}, {ListOfPoints, ErrAccIn}) -> + fun({_, Data}, {ListOfPoints, ErrAccIn}) -> case data_to_points(Data, SyntaxLines) of {ok, Points} -> {[Points | ListOfPoints], ErrAccIn}; diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl index c0d63002b..d79139f17 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_SUITE.erl @@ -537,11 +537,11 @@ t_start_ok(Config) -> begin case QueryMode of async -> - ?assertMatch(ok, send_message(Config, SentData)), - ct:sleep(500); + ?assertMatch(ok, send_message(Config, SentData)); sync -> ?assertMatch({ok, 204, _}, send_message(Config, SentData)) end, + ct:sleep(1500), PersistedData = query_by_clientid(ClientId, Config), Expected = #{ bool => <<"true">>, @@ -594,8 +594,11 @@ t_start_already_started(Config) -> {ok, #{bridges := #{TypeAtom := #{NameAtom := InfluxDBConfigMap}}}} = emqx_hocon:check( emqx_bridge_schema, InfluxDBConfigString ), + ConnConfigMap = emqx_bridge_influxdb_connector:transform_bridge_v1_config_to_connector_config( + InfluxDBConfigMap + ), ?check_trace( - emqx_bridge_influxdb_connector:on_start(ResourceId, InfluxDBConfigMap), + emqx_bridge_influxdb_connector:on_start(ResourceId, ConnConfigMap), fun(Result, Trace) -> ?assertMatch({ok, _}, Result), ?assertMatch([_], ?of_kind(influxdb_connector_start_already_started, Trace)), @@ -700,11 +703,11 @@ t_const_timestamp(Config) -> }, case QueryMode of async -> - ?assertMatch(ok, send_message(Config, SentData)), - ct:sleep(500); + ?assertMatch(ok, send_message(Config, SentData)); sync -> ?assertMatch({ok, 204, _}, send_message(Config, SentData)) end, + ct:sleep(1500), PersistedData = query_by_clientid(ClientId, Config), Expected = #{foo => <<"123">>}, assert_persisted_data(ClientId, Expected, PersistedData), @@ -762,10 +765,7 @@ t_boolean_variants(Config) -> async -> ?assertMatch(ok, send_message(Config, SentData)) end, - case QueryMode of - async -> ct:sleep(500); - sync -> ok - end, + ct:sleep(1500), PersistedData = query_by_clientid(ClientId, Config), Expected = #{ bool => atom_to_binary(Translation), @@ -817,9 +817,10 @@ t_any_num_as_float(Config) -> ?assertMatch({ok, 204, _}, send_message(Config, SentData)), ok; async -> - ?assertMatch(ok, send_message(Config, SentData)), - ct:sleep(500) + ?assertMatch(ok, send_message(Config, SentData)) end, + %% sleep is still need even in sync mode, or we would get an empty result sometimes + ct:sleep(1500), PersistedData = query_by_clientid(ClientId, Config), Expected = #{float_no_dp => <<"123">>, float_dp => <<"123">>}, assert_persisted_data(ClientId, Expected, PersistedData), @@ -938,10 +939,14 @@ t_create_disconnected(Config) -> ?assertMatch({ok, _}, create_bridge(Config)) end), fun(Trace) -> - ?assertMatch( - [#{error := influxdb_client_not_alive, reason := econnrefused}], - ?of_kind(influxdb_connector_start_failed, Trace) - ), + [#{error := influxdb_client_not_alive, reason := Reason}] = + ?of_kind(influxdb_connector_start_failed, Trace), + case Reason of + econnrefused -> ok; + closed -> ok; + {closed, _} -> ok; + _ -> ct:fail("influxdb_client_not_alive with wrong reason: ~p", [Reason]) + end, ok end ), @@ -1146,10 +1151,8 @@ t_authentication_error(Config0) -> ok. t_authentication_error_on_get_status(Config0) -> - ResourceId = resource_id(Config0), - % Fake initialization to simulate credential update after bridge was created. - emqx_common_test_helpers:with_mock( + ResourceId = emqx_common_test_helpers:with_mock( influxdb, check_auth, fun(_) -> @@ -1165,20 +1168,20 @@ t_authentication_error_on_get_status(Config0) -> end, Config = lists:keyreplace(influxdb_config, 1, Config0, {influxdb_config, InfluxConfig}), {ok, _} = create_bridge(Config), + ResourceId = resource_id(Config0), ?retry( _Sleep = 1_000, _Attempts = 10, ?assertEqual({ok, connected}, emqx_resource_manager:health_check(ResourceId)) - ) + ), + ResourceId end ), - % Now back to wrong credentials ?assertEqual({ok, disconnected}, emqx_resource_manager:health_check(ResourceId)), ok. t_authentication_error_on_send_message(Config0) -> - ResourceId = resource_id(Config0), QueryMode = proplists:get_value(query_mode, Config0, sync), InfluxDBType = ?config(influxdb_type, Config0), InfluxConfig0 = proplists:get_value(influxdb_config, Config0), @@ -1198,6 +1201,7 @@ t_authentication_error_on_send_message(Config0) -> end, fun() -> {ok, _} = create_bridge(Config), + ResourceId = resource_id(Config), ?retry( _Sleep = 1_000, _Attempts = 10, diff --git a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl index 94e8e3fad..915662b24 100644 --- a/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl +++ b/apps/emqx_bridge_influxdb/test/emqx_bridge_influxdb_connector_SUITE.erl @@ -66,7 +66,7 @@ t_lifecycle(Config) -> Port = ?config(influxdb_tcp_port, Config), perform_lifecycle_check( <<"emqx_bridge_influxdb_connector_SUITE">>, - influxdb_config(Host, Port, false, <<"verify_none">>) + influxdb_connector_config(Host, Port, false, <<"verify_none">>) ). perform_lifecycle_check(PoolName, InitialConfig) -> @@ -76,6 +76,7 @@ perform_lifecycle_check(PoolName, InitialConfig) -> % expects this FullConfig = CheckedConfig#{write_syntax => influxdb_write_syntax()}, {ok, #{ + id := ResourceId, state := #{client := #{pool := ReturnedPoolName}} = State, status := InitialStatus }} = emqx_resource:create_local( @@ -93,8 +94,18 @@ perform_lifecycle_check(PoolName, InitialConfig) -> }} = emqx_resource:get_instance(PoolName), ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), + %% install actions to the connector + ActionConfig = influxdb_action_config(), + ChannelId = <<"test_channel">>, + ?assertEqual( + ok, + emqx_resource_manager:add_channel( + ResourceId, ChannelId, ActionConfig + ) + ), + ?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)), % % Perform query as further check that the resource is working as expected - ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())), + ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query(ChannelId))), ?assertEqual(ok, emqx_resource:stop(PoolName)), % Resource will be listed still, but state will be changed and healthcheck will fail % as the worker no longer exists. @@ -116,7 +127,15 @@ perform_lifecycle_check(PoolName, InitialConfig) -> {ok, ?CONNECTOR_RESOURCE_GROUP, #{status := InitialStatus}} = emqx_resource:get_instance(PoolName), ?assertEqual({ok, connected}, emqx_resource:health_check(PoolName)), - ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query())), + ChannelId = <<"test_channel">>, + ?assertEqual( + ok, + emqx_resource_manager:add_channel( + ResourceId, ChannelId, ActionConfig + ) + ), + ?assertMatch(#{status := connected}, emqx_resource:channel_health_check(ResourceId, ChannelId)), + ?assertMatch({ok, 204, _}, emqx_resource:query(PoolName, test_query(ChannelId))), % Stop and remove the resource in one go. ?assertEqual(ok, emqx_resource:remove_local(PoolName)), ?assertEqual({error, not_found}, ecpool:stop_sup_pool(ReturnedPoolName)), @@ -127,7 +146,7 @@ t_tls_verify_none(Config) -> PoolName = <<"testpool-1">>, Host = ?config(influxdb_tls_host, Config), Port = ?config(influxdb_tls_port, Config), - InitialConfig = influxdb_config(Host, Port, true, <<"verify_none">>), + InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_none">>), ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid), ?assertEqual(connected, ValidStatus), InvalidStatus = perform_tls_opts_check(PoolName, InitialConfig, fail), @@ -138,7 +157,7 @@ t_tls_verify_peer(Config) -> PoolName = <<"testpool-2">>, Host = ?config(influxdb_tls_host, Config), Port = ?config(influxdb_tls_port, Config), - InitialConfig = influxdb_config(Host, Port, true, <<"verify_peer">>), + InitialConfig = influxdb_connector_config(Host, Port, true, <<"verify_peer">>), %% This works without a CA-cert & friends since we are using a mock ValidStatus = perform_tls_opts_check(PoolName, InitialConfig, valid), ?assertEqual(connected, ValidStatus), @@ -191,19 +210,30 @@ perform_tls_opts_check(PoolName, InitialConfig, VerifyReturn) -> % %% Helpers % %%------------------------------------------------------------------------------ -influxdb_config(Host, Port, SslEnabled, Verify) -> +influxdb_connector_config(Host, Port, SslEnabled, Verify) -> Server = list_to_binary(io_lib:format("~s:~b", [Host, Port])), - ResourceConfig = #{ - <<"bucket">> => <<"mqtt">>, - <<"org">> => <<"emqx">>, - <<"token">> => <<"abcdefg">>, + ConnectorConf = #{ + <<"parameters">> => #{ + <<"influxdb_type">> => <<"influxdb_api_v2">>, + <<"bucket">> => <<"mqtt">>, + <<"org">> => <<"emqx">>, + <<"token">> => <<"abcdefg">> + }, <<"server">> => Server, <<"ssl">> => #{ <<"enable">> => SslEnabled, <<"verify">> => Verify } }, - #{<<"config">> => ResourceConfig}. + #{<<"config">> => ConnectorConf}. + +influxdb_action_config() -> + #{ + parameters => #{ + write_syntax => influxdb_write_syntax(), + precision => ms + } + }. custom_verify() -> fun @@ -227,8 +257,8 @@ influxdb_write_syntax() -> } ]. -test_query() -> - {send_message, #{ +test_query(ChannelId) -> + {ChannelId, #{ <<"clientid">> => <<"something">>, <<"payload">> => #{bool => true}, <<"topic">> => <<"connector_test">>, diff --git a/apps/emqx_connector/src/emqx_connector.app.src b/apps/emqx_connector/src/emqx_connector.app.src index 3e781dae0..261c70d34 100644 --- a/apps/emqx_connector/src/emqx_connector.app.src +++ b/apps/emqx_connector/src/emqx_connector.app.src @@ -1,7 +1,7 @@ %% -*- mode: erlang -*- {application, emqx_connector, [ {description, "EMQX Data Integration Connectors"}, - {vsn, "0.1.36"}, + {vsn, "0.1.37"}, {registered, []}, {mod, {emqx_connector_app, []}}, {applications, [ diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 4e8618915..6fe08ba4a 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -34,6 +34,8 @@ resource_type(matrix) -> emqx_postgresql; resource_type(mongodb) -> emqx_bridge_mongodb_connector; +resource_type(influxdb) -> + emqx_bridge_influxdb_connector; resource_type(mysql) -> emqx_bridge_mysql_connector; resource_type(pgsql) -> @@ -112,6 +114,14 @@ connector_structs() -> required => false } )}, + {influxdb, + mk( + hoconsc:map(name, ref(emqx_bridge_influxdb, "config_connector")), + #{ + desc => <<"InfluxDB Connector Config">>, + required => false + } + )}, {mysql, mk( hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")), @@ -170,6 +180,7 @@ schema_modules() -> emqx_bridge_kafka, emqx_bridge_matrix, emqx_bridge_mongodb, + emqx_bridge_influxdb, emqx_bridge_mysql, emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_proxy, @@ -196,6 +207,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"), api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"), + api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"), api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index f67cd6991..c491fbed0 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -131,6 +131,8 @@ connector_type_to_bridge_types(matrix) -> [matrix]; connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; +connector_type_to_bridge_types(influxdb) -> + [influxdb, influxdb_api_v1, influxdb_api_v2]; connector_type_to_bridge_types(mysql) -> [mysql]; connector_type_to_bridge_types(pgsql) -> diff --git a/mix.exs b/mix.exs index 26617d54b..2be11e957 100644 --- a/mix.exs +++ b/mix.exs @@ -199,7 +199,7 @@ defmodule EMQXUmbrella.MixProject do defp enterprise_deps(_profile_info = %{edition_type: :enterprise}) do [ {:hstreamdb_erl, github: "hstreamdb/hstreamdb_erl", tag: "0.4.5+v0.16.1"}, - {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.11", override: true}, + {:influxdb, github: "emqx/influxdb-client-erl", tag: "1.1.12", override: true}, {:wolff, github: "kafka4beam/wolff", tag: "1.9.1"}, {:kafka_protocol, github: "kafka4beam/kafka_protocol", tag: "4.1.3", override: true}, {:brod_gssapi, github: "kafka4beam/brod_gssapi", tag: "v0.1.1"}, diff --git a/rel/i18n/emqx_bridge_influxdb.hocon b/rel/i18n/emqx_bridge_influxdb.hocon index 48454bbd3..5a81334db 100644 --- a/rel/i18n/emqx_bridge_influxdb.hocon +++ b/rel/i18n/emqx_bridge_influxdb.hocon @@ -47,4 +47,19 @@ Please note that a placeholder for an integer value must be annotated with a suf write_syntax.label: """Write Syntax""" +action_parameters.label: +"""Action Parameters""" +action_parameters.desc: +"""Additional parameters specific to this action type""" + +connector.label: +"""InfluxDB Connector""" +connector.desc: +"""InfluxDB Connector Configs""" + +influxdb_action.label: +"""InfluxDB Action""" +influxdb_action.desc: +"""Action to interact with a InfluxDB connector""" + } diff --git a/rel/i18n/emqx_bridge_influxdb_connector.hocon b/rel/i18n/emqx_bridge_influxdb_connector.hocon index ce79c2a93..4020afd6f 100644 --- a/rel/i18n/emqx_bridge_influxdb_connector.hocon +++ b/rel/i18n/emqx_bridge_influxdb_connector.hocon @@ -68,4 +68,9 @@ username.desc: username.label: """Username""" +influxdb_parameters.label: +"""InfluxDB Type Specific Parameters""" +influxdb_parameters.desc: +"""Set of parameters specific for the given type of this InfluxDB connector, `influxdb_type` can be one of `influxdb_api_v1`, `influxdb_api_v1`.""" + }