diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src index ef288368d..864d8945f 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.app.src @@ -8,7 +8,7 @@ emqx_resource, influxdb ]}, - {env, []}, + {env, [{emqx_action_info_modules, [emqx_bridge_influxdb_action_info]}]}, {modules, []}, {links, []} ]}. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl index acb295752..8d5360887 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb.erl @@ -11,7 +11,6 @@ -import(hoconsc, [mk/2, enum/1, ref/2]). -export([ - conn_bridge_examples/1, write_syntax_type/0 ]). @@ -22,17 +21,28 @@ desc/1 ]). +%% Examples +-export([ + bridge_v2_examples/1, + conn_bridge_examples/1, + connector_examples/1 +]). + -type write_syntax() :: list(). -reflect_type([write_syntax/0]). -typerefl_from_string({write_syntax/0, ?MODULE, to_influx_lines}). -export([to_influx_lines/1]). +-define(CONNECTOR_TYPE, influxdb). +-define(ACTION_TYPE, influxdb). + %% ------------------------------------------------------------------------------------------------- %% api write_syntax_type() -> typerefl:alias("string", write_syntax()). +%% Examples conn_bridge_examples(Method) -> [ #{ @@ -49,25 +59,80 @@ conn_bridge_examples(Method) -> } ]. +bridge_v2_examples(Method) -> + WriteExample = + <<"${topic},clientid=${clientid} ", "payload=${payload},", + "${clientid}_int_value=${payload.int_key}i,", "bool=${payload.bool}">>, + ParamsExample = #{ + parameters => #{ + write_syntax => WriteExample, precision => ms + } + }, + [ + #{ + <<"influxdb">> => #{ + summary => <<"InfluxDB Action">>, + value => emqx_bridge_v2_schema:action_values( + Method, influxdb, influxdb, ParamsExample + ) + } + } + ]. + +connector_examples(Method) -> + [ + #{ + <<"influxdb_api_v1">> => #{ + summary => <<"InfluxDB HTTP API V1 Connector">>, + value => emqx_connector_schema:connector_values( + Method, influxdb, connector_values(influxdb_api_v1) + ) + } + }, + #{ + <<"influxdb_api_v2">> => #{ + summary => <<"InfluxDB HTTP API V2 Connector">>, + value => emqx_connector_schema:connector_values( + Method, influxdb, connector_values(influxdb_api_v2) + ) + } + } + ]. + +connector_values(Type) -> + maps:merge(basic_connector_values(), #{parameters => connector_values_v(Type)}). + +connector_values_v(influxdb_api_v2) -> + #{ + influxdb_type => influxdb_api_v2, + bucket => <<"example_bucket">>, + org => <<"examlpe_org">>, + token => <<"example_token">> + }; +connector_values_v(influxdb_api_v1) -> + #{ + influxdb_type => influxdb_api_v1, + database => <<"example_database">>, + username => <<"example_username">>, + password => <<"******">> + }. + +basic_connector_values() -> + #{ + enable => true, + server => <<"127.0.0.1:8086">>, + ssl => #{enable => false} + }. + values(Protocol, get) -> values(Protocol, post); values("influxdb_api_v2", post) -> SupportUint = <<"uint_value=${payload.uint_key}u,">>, - TypeOpts = #{ - bucket => <<"example_bucket">>, - org => <<"examlpe_org">>, - token => <<"example_token">>, - server => <<"127.0.0.1:8086">> - }, + TypeOpts = connector_values_v(influxdb_api_v2), values(common, "influxdb_api_v2", SupportUint, TypeOpts); values("influxdb_api_v1", post) -> SupportUint = <<>>, - TypeOpts = #{ - database => <<"example_database">>, - username => <<"example_username">>, - password => <<"******">>, - server => <<"127.0.0.1:8086">> - }, + TypeOpts = connector_values_v(influxdb_api_v1), values(common, "influxdb_api_v1", SupportUint, TypeOpts); values(Protocol, put) -> values(Protocol, post). @@ -98,6 +163,10 @@ namespace() -> "bridge_influxdb". roots() -> []. +fields("config_connector") -> + emqx_connector_schema:common_fields() ++ + connection_fields() ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts); fields("post_api_v1") -> method_fields(post, influxdb_api_v1); fields("post_api_v2") -> @@ -110,12 +179,59 @@ fields("get_api_v1") -> method_fields(get, influxdb_api_v1); fields("get_api_v2") -> method_fields(get, influxdb_api_v2); +fields(action) -> + {influxdb, + mk( + hoconsc:map(name, ref(?MODULE, influxdb_action)), + #{desc => <<"InfluxDB Action Config">>, required => false} + )}; +fields(influxdb_action) -> + emqx_bridge_v2_schema:make_producer_action_schema( + mk(ref(?MODULE, action_parameters), #{ + required => true, desc => ?DESC(action_parameters) + }) + ); +fields(action_parameters) -> + [ + {write_syntax, fun write_syntax/1}, + emqx_bridge_influxdb_connector:precision_field() + ]; +fields(connector_resource_opts) -> + emqx_connector_schema:resource_opts_fields(); +fields(Field) when + Field == "get_connector"; + Field == "put_connector"; + Field == "post_connector" +-> + Fields = + connection_fields() ++ + emqx_connector_schema:resource_opts_ref(?MODULE, connector_resource_opts), + emqx_connector_schema:api_fields(Field, ?CONNECTOR_TYPE, Fields); +fields(Field) when + Field == "get_bridge_v2"; + Field == "post_bridge_v2"; + Field == "put_bridge_v2" +-> + emqx_bridge_v2_schema:api_fields(Field, ?ACTION_TYPE, fields(influxdb_action)); fields(Type) when Type == influxdb_api_v1 orelse Type == influxdb_api_v2 -> influxdb_bridge_common_fields() ++ connector_fields(Type). +connection_fields() -> + [ + emqx_bridge_influxdb_connector:server_field(), + {parameters, + mk( + hoconsc:union([ + ref(emqx_bridge_influxdb_connector, "connector_" ++ T) + || T <- ["influxdb_api_v1", "influxdb_api_v2"] + ]), + #{required => true, desc => ?DESC("influxdb_parameters")} + )} + ] ++ emqx_connector_schema_lib:ssl_fields(). + method_fields(post, ConnectorType) -> influxdb_bridge_common_fields() ++ connector_fields(ConnectorType) ++ @@ -154,6 +270,10 @@ desc(influxdb_api_v1) -> ?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v1"); desc(influxdb_api_v2) -> ?DESC(emqx_bridge_influxdb_connector, "influxdb_api_v2"); +desc(influxdb_action) -> + ?DESC(influxdb_action); +desc(action_parameters) -> + ?DESC(action_parameters); desc(_) -> undefined. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl new file mode 100644 index 000000000..a74e88df6 --- /dev/null +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_action_info.erl @@ -0,0 +1,77 @@ +-module(emqx_bridge_influxdb_action_info). + +-behaviour(emqx_action_info). + +-export([ + bridge_v1_config_to_action_config/2, + bridge_v1_config_to_connector_config/1, + connector_action_config_to_bridge_v1_config/2, + bridge_v1_type_name/0, + action_type_name/0, + connector_type_name/0, + schema_module/0 +]). + +%% dynamic callback +-export([ + bridge_v1_type_name_fun/1 +]). + +-import(emqx_utils_conv, [bin/1]). + +-define(SCHEMA_MODULE, emqx_bridge_influxdb). + +bridge_v1_config_to_action_config(BridgeV1Config, ConnectorName) -> + ActionTopLevelKeys = schema_keys(influxdb_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ActionConfig = make_config_map(ActionKeys, ActionParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_bridge_v2_schema:project_to_actions_resource_opts/1, + ActionConfig#{<<"connector">> => ConnectorName} + ). + +bridge_v1_config_to_connector_config(BridgeV1Config) -> + ActionTopLevelKeys = schema_keys(influxdb_action), + ActionParametersKeys = schema_keys(action_parameters), + ActionKeys = ActionTopLevelKeys ++ ActionParametersKeys, + ConnectorTopLevelKeys = schema_keys("config_connector"), + ConnectorKeys = maps:keys(BridgeV1Config) -- (ActionKeys -- ConnectorTopLevelKeys), + ConnectorParametersKeys = ConnectorKeys -- ConnectorTopLevelKeys, + ConnConfig0 = make_config_map(ConnectorKeys, ConnectorParametersKeys, BridgeV1Config), + emqx_utils_maps:update_if_present( + <<"resource_opts">>, + fun emqx_connector_schema:project_to_connector_resource_opts/1, + ConnConfig0 + ). + +connector_action_config_to_bridge_v1_config(ConnectorRawConf, ActionRawConf) -> + RawConf = emqx_action_info:connector_action_config_to_bridge_v1_config( + ConnectorRawConf, ActionRawConf + ), + maps:without([<<"influxdb_type">>], RawConf). + +bridge_v1_type_name() -> + {fun ?MODULE:bridge_v1_type_name_fun/1, bridge_v1_type_names()}. + +action_type_name() -> influxdb. + +connector_type_name() -> influxdb. + +schema_module() -> ?SCHEMA_MODULE. + +bridge_v1_type_name_fun({#{<<"parameters">> := #{<<"influxdb_type">> := Type}}, _}) -> + v1_type(Type). + +v1_type(<<"influxdb_api_v1">>) -> influxdb_api_v1; +v1_type(<<"influxdb_api_v2">>) -> influxdb_api_v2. + +make_config_map(PickKeys, IndentKeys, Config) -> + Conf0 = maps:with(PickKeys, Config), + emqx_utils_maps:indent(<<"parameters">>, IndentKeys, Conf0). + +schema_keys(Name) -> + [bin(Key) || Key <- proplists:get_keys(?SCHEMA_MODULE:fields(Name))]. + +bridge_v1_type_names() -> [influxdb, influxdb_api_v1, influxdb_api_v2]. diff --git a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl index 2b4fb8d74..6f4e21df5 100644 --- a/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl +++ b/apps/emqx_bridge_influxdb/src/emqx_bridge_influxdb_connector.erl @@ -19,6 +19,10 @@ callback_mode/0, on_start/2, on_stop/2, + on_add_channel/4, + on_remove_channel/3, + on_get_channel_status/3, + on_get_channels/1, on_query/3, on_batch_query/3, on_query_async/4, @@ -34,6 +38,8 @@ desc/1 ]). +-export([precision_field/0, server_field/0]). + %% only for test -export([is_unrecoverable_error/1]). @@ -55,6 +61,35 @@ %% resource callback callback_mode() -> async_if_possible. +on_add_channel( + _InstanceId, + #{channels := Channels} = OldState, + ChannelId, + #{parameters := Parameters} = ChannelConfig0 +) -> + #{write_syntax := WriteSytaxTmpl} = Parameters, + Precision = maps:get(precision, Parameters, ms), + ChannelConfig = maps:merge( + Parameters, + ChannelConfig0#{ + write_syntax => to_config(WriteSytaxTmpl, Precision) + } + ), + {ok, OldState#{channels => maps:put(ChannelId, ChannelConfig, Channels)}}. + +on_remove_channel(_InstanceId, #{channels := Channels} = State, ChannelId) -> + NewState = State#{channels => maps:remove(ChannelId, Channels)}, + {ok, NewState}. + +on_get_channel_status(InstanceId, _ChannelId, State) -> + case on_get_status(InstanceId, State) of + connected -> connected; + _ -> connecting + end. + +on_get_channels(InstanceId) -> + emqx_bridge_v2:get_channels_for_connector(InstanceId). + on_start(InstId, Config) -> %% InstID as pool would be handled by influxdb client %% so there is no need to allocate pool_name here @@ -73,8 +108,9 @@ on_stop(InstId, _State) -> ok end. -on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, client := Client}) -> - case data_to_points(Data, SyntaxLines) of +on_query(InstId, {Channel, Message}, #{channels := ChannelConf, client := Client}) -> + #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + case data_to_points(Message, SyntaxLines) of {ok, Points} -> ?tp( influxdb_connector_send_query, @@ -92,7 +128,9 @@ on_query(InstId, {send_message, Data}, _State = #{write_syntax := SyntaxLines, c %% Once a Batched Data trans to points failed. %% This batch query failed -on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client := Client}) -> +on_batch_query(InstId, BatchData, #{channels := ChannelConf, client := Client}) -> + [{Channel, _} | _] = BatchData, + #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), case parse_batch_data(InstId, BatchData, SyntaxLines) of {ok, Points} -> ?tp( @@ -110,11 +148,12 @@ on_batch_query(InstId, BatchData, _State = #{write_syntax := SyntaxLines, client on_query_async( InstId, - {send_message, Data}, + {Channel, Message}, {ReplyFun, Args}, - _State = #{write_syntax := SyntaxLines, client := Client} + #{channels := ChannelConf, client := Client} ) -> - case data_to_points(Data, SyntaxLines) of + #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), + case data_to_points(Message, SyntaxLines) of {ok, Points} -> ?tp( influxdb_connector_send_query, @@ -134,8 +173,10 @@ on_batch_query_async( InstId, BatchData, {ReplyFun, Args}, - #{write_syntax := SyntaxLines, client := Client} + #{channels := ChannelConf, client := Client} ) -> + [{Channel, _} | _] = BatchData, + #{write_syntax := SyntaxLines} = maps:get(Channel, ChannelConf), case parse_batch_data(InstId, BatchData, SyntaxLines) of {ok, Points} -> ?tp( @@ -177,30 +218,51 @@ roots() -> fields(common) -> [ - {server, server()}, - {precision, - %% The influxdb only supports these 4 precision: - %% See "https://github.com/influxdata/influxdb/blob/ - %% 6b607288439a991261307518913eb6d4e280e0a7/models/points.go#L487" for - %% more information. - mk(enum([ns, us, ms, s]), #{ - required => false, default => ms, desc => ?DESC("precision") - })} + server_field(), + precision_field() ]; +fields("connector_influxdb_api_v1") -> + [influxdb_type_field(influxdb_api_v1) | influxdb_api_v1_fields()]; +fields("connector_influxdb_api_v2") -> + [influxdb_type_field(influxdb_api_v2) | influxdb_api_v2_fields()]; fields(influxdb_api_v1) -> - fields(common) ++ - [ - {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, - {username, mk(binary(), #{desc => ?DESC("username")})}, - {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})} - ] ++ emqx_connector_schema_lib:ssl_fields(); + fields(common) ++ influxdb_api_v1_fields() ++ emqx_connector_schema_lib:ssl_fields(); fields(influxdb_api_v2) -> - fields(common) ++ - [ - {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, - {org, mk(binary(), #{required => true, desc => ?DESC("org")})}, - {token, emqx_schema_secret:mk(#{required => true, desc => ?DESC("token")})} - ] ++ emqx_connector_schema_lib:ssl_fields(). + fields(common) ++ influxdb_api_v2_fields() ++ emqx_connector_schema_lib:ssl_fields(). + +influxdb_type_field(Type) -> + {influxdb_type, #{ + required => true, + type => Type, + default => Type, + desc => ?DESC(atom_to_list(Type)) + }}. +server_field() -> + {server, server()}. + +precision_field() -> + {precision, + %% The influxdb only supports these 4 precision: + %% See "https://github.com/influxdata/influxdb/blob/ + %% 6b607288439a991261307518913eb6d4e280e0a7/models/points.go#L487" for + %% more information. + mk(enum([ns, us, ms, s]), #{ + required => false, default => ms, desc => ?DESC("precision") + })}. + +influxdb_api_v1_fields() -> + [ + {database, mk(binary(), #{required => true, desc => ?DESC("database")})}, + {username, mk(binary(), #{desc => ?DESC("username")})}, + {password, emqx_schema_secret:mk(#{desc => ?DESC("password")})} + ]. + +influxdb_api_v2_fields() -> + [ + {bucket, mk(binary(), #{required => true, desc => ?DESC("bucket")})}, + {org, mk(binary(), #{required => true, desc => ?DESC("org")})}, + {token, emqx_schema_secret:mk(#{required => true, desc => ?DESC("token")})} + ]. server() -> Meta = #{ @@ -216,6 +278,10 @@ desc(common) -> desc(influxdb_api_v1) -> ?DESC("influxdb_api_v1"); desc(influxdb_api_v2) -> + ?DESC("influxdb_api_v2"); +desc("connector_influxdb_api_v1") -> + ?DESC("influxdb_api_v1"); +desc("connector_influxdb_api_v2") -> ?DESC("influxdb_api_v2"). %% ------------------------------------------------------------------------------------------------- @@ -248,22 +314,14 @@ start_client(InstId, Config) -> {error, R} end. -do_start_client( - InstId, - ClientConfig, - Config = #{write_syntax := Lines} -) -> - Precision = maps:get(precision, Config, ms), +do_start_client(InstId, ClientConfig, Config) -> case influxdb:start_client(ClientConfig) of {ok, Client} -> case influxdb:is_alive(Client, true) of true -> case influxdb:check_auth(Client) of ok -> - State = #{ - client => Client, - write_syntax => to_config(Lines, Precision) - }, + State = #{client => Client, channels => #{}}, ?SLOG(info, #{ msg => "starting_influxdb_connector_success", connector => InstId, @@ -333,23 +391,17 @@ client_config( ] ++ protocol_config(Config). %% api v1 config -protocol_config( - #{ - database := DB, - ssl := SSL - } = Config -) -> +protocol_config(#{ + parameters := #{influxdb_type := influxdb_api_v1, database := DB} = Params, ssl := SSL +}) -> [ {protocol, http}, {version, v1}, {database, str(DB)} - ] ++ username(Config) ++ - password(Config) ++ ssl_config(SSL); + ] ++ username(Params) ++ password(Params) ++ ssl_config(SSL); %% api v2 config protocol_config(#{ - bucket := Bucket, - org := Org, - token := Token, + parameters := #{influxdb_type := influxdb_api_v2, bucket := Bucket, org := Org, token := Token}, ssl := SSL }) -> [ @@ -501,7 +553,7 @@ to_maps_config(K, V, Res) -> %% Tags & Fields Data Trans parse_batch_data(InstId, BatchData, SyntaxLines) -> {Points, Errors} = lists:foldl( - fun({send_message, Data}, {ListOfPoints, ErrAccIn}) -> + fun({_, Data}, {ListOfPoints, ErrAccIn}) -> case data_to_points(Data, SyntaxLines) of {ok, Points} -> {[Points | ListOfPoints], ErrAccIn}; diff --git a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl index 4e8618915..6fe08ba4a 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_ee_schema.erl @@ -34,6 +34,8 @@ resource_type(matrix) -> emqx_postgresql; resource_type(mongodb) -> emqx_bridge_mongodb_connector; +resource_type(influxdb) -> + emqx_bridge_influxdb_connector; resource_type(mysql) -> emqx_bridge_mysql_connector; resource_type(pgsql) -> @@ -112,6 +114,14 @@ connector_structs() -> required => false } )}, + {influxdb, + mk( + hoconsc:map(name, ref(emqx_bridge_influxdb, "config_connector")), + #{ + desc => <<"InfluxDB Connector Config">>, + required => false + } + )}, {mysql, mk( hoconsc:map(name, ref(emqx_bridge_mysql, "config_connector")), @@ -170,6 +180,7 @@ schema_modules() -> emqx_bridge_kafka, emqx_bridge_matrix, emqx_bridge_mongodb, + emqx_bridge_influxdb, emqx_bridge_mysql, emqx_bridge_syskeeper_connector, emqx_bridge_syskeeper_proxy, @@ -196,6 +207,7 @@ api_schemas(Method) -> api_ref(emqx_bridge_kafka, <<"kafka_producer">>, Method ++ "_connector"), api_ref(emqx_bridge_matrix, <<"matrix">>, Method ++ "_connector"), api_ref(emqx_bridge_mongodb, <<"mongodb">>, Method ++ "_connector"), + api_ref(emqx_bridge_influxdb, <<"influxdb">>, Method ++ "_connector"), api_ref(emqx_bridge_mysql, <<"mysql">>, Method ++ "_connector"), api_ref(emqx_bridge_syskeeper_connector, <<"syskeeper_forwarder">>, Method), api_ref(emqx_bridge_syskeeper_proxy, <<"syskeeper_proxy">>, Method), diff --git a/apps/emqx_connector/src/schema/emqx_connector_schema.erl b/apps/emqx_connector/src/schema/emqx_connector_schema.erl index f67cd6991..c491fbed0 100644 --- a/apps/emqx_connector/src/schema/emqx_connector_schema.erl +++ b/apps/emqx_connector/src/schema/emqx_connector_schema.erl @@ -131,6 +131,8 @@ connector_type_to_bridge_types(matrix) -> [matrix]; connector_type_to_bridge_types(mongodb) -> [mongodb, mongodb_rs, mongodb_sharded, mongodb_single]; +connector_type_to_bridge_types(influxdb) -> + [influxdb, influxdb_api_v1, influxdb_api_v2]; connector_type_to_bridge_types(mysql) -> [mysql]; connector_type_to_bridge_types(pgsql) -> diff --git a/rel/i18n/emqx_bridge_influxdb.hocon b/rel/i18n/emqx_bridge_influxdb.hocon index 48454bbd3..ed628b325 100644 --- a/rel/i18n/emqx_bridge_influxdb.hocon +++ b/rel/i18n/emqx_bridge_influxdb.hocon @@ -47,4 +47,19 @@ Please note that a placeholder for an integer value must be annotated with a suf write_syntax.label: """Write Syntax""" +action_parameters.label: +"""Action Parameters""" +action_parameters.desc: +"""Additional parameters specific to this action type""" + +influxdb_action.label: +"""InfluxDB Action""" +influxdb_action.desc: +"""Action to interact with a InfluxDB connector""" + +influxdb_parameters.label: +"""InfluxdbDB Type Specific Parameters""" +influxdb_parameters.desc: +"""Set of parameters specific for the given type of this InfluxdbDB connector, `influxdb_type` can be one of `influxdb_api_v1`, `influxdb_api_v1`.""" + }